* progress in moving all the fileaccess over to a wrapper system that can do fd based...
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 # include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37         time_t tv_sec; /* seconds */
38         long tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43         int  tz_minuteswest; /* minutes W of Greenwich */
44         int  tz_dsttime;     /* type of dst correction */
45 };
46
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
48
49         struct _timeb current_time;
50
51         _ftime(&current_time);
52
53         t->tv_sec  = current_time.time;
54         t->tv_usec = current_time.millitm * 1000;
55
56         return 0;
57 }
58
59 #endif
60 /*
61  * normilize time as returned by gettimeofday. usec part must
62  * be always >= 0
63  */
64 static void normalize_time(struct timeval *t)
65 {
66         if(t->tv_usec < 0) {
67                 t->tv_sec--;
68                 t->tv_usec += 1000000L;
69         }
70 }
71
72 /* Local prototypes */
73 int LockRRD(int in_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
76                                         unsigned long *rra_current,
77                                         unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 int UNUSED(in_file),
80 #else
81 int in_file,
82 #endif
83                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
86                                         unsigned long *rra_current,
87                                         unsigned short CDP_scratch_idx, int in_file,
88                                         info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv);
91 int _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv, 
92                                         info_t*);
93
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
95
96
97 info_t *rrd_update_v(int argc, char **argv)
98 {
99     char             *tmplt = NULL;          
100         info_t *result = NULL;
101         infoval rc;
102       rc.u_int = -1;
103     optind = 0; opterr = 0;  /* initialize getopt */
104
105     while (1) {
106                 static struct option long_options[] =
107                         {
108                                 {"template",      required_argument, 0, 't'},
109                                 {0,0,0,0}
110                         };
111                 int option_index = 0;
112                 int opt;
113                 opt = getopt_long(argc, argv, "t:", 
114                                                   long_options, &option_index);
115                 
116                 if (opt == EOF)
117                         break;
118                 
119                 switch(opt) {
120                 case 't':
121                         tmplt = optarg;
122                         break;
123                 
124                 case '?':
125                         rrd_set_error("unknown option '%s'",argv[optind-1]);
126                         goto end_tag;
127                 }
128     }
129
130     /* need at least 2 arguments: filename, data. */
131     if (argc-optind < 2) {
132                 rrd_set_error("Not enough arguments");
133                 goto end_tag;
134     }
135     rc.u_int = 0;
136     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
137         rc.u_int = _rrd_update(argv[optind], tmplt,
138                       argc - optind - 1, (const char **)(argv + optind + 1), result);
139     result->value.u_int = rc.u_int;
140 end_tag:
141     return result;
142 }
143
144 int
145 rrd_update(int argc, char **argv)
146 {
147     char             *tmplt = NULL;          
148     int rc;
149     optind = 0; opterr = 0;  /* initialize getopt */
150
151     while (1) {
152                 static struct option long_options[] =
153                         {
154                                 {"template",      required_argument, 0, 't'},
155                                 {0,0,0,0}
156                         };
157                 int option_index = 0;
158                 int opt;
159                 opt = getopt_long(argc, argv, "t:", 
160                                                   long_options, &option_index);
161                 
162                 if (opt == EOF)
163                         break;
164                 
165                 switch(opt) {
166                 case 't':
167                         tmplt = optarg;
168                         break;
169                 
170                 case '?':
171                         rrd_set_error("unknown option '%s'",argv[optind-1]);
172                         return(-1);
173                 }
174     }
175
176     /* need at least 2 arguments: filename, data. */
177     if (argc-optind < 2) {
178                 rrd_set_error("Not enough arguments");
179
180                 return -1;
181     }
182  
183         rc = rrd_update_r(argv[optind], tmplt,
184                       argc - optind - 1, (const char **)(argv + optind + 1));
185     return rc;
186 }
187
188 int
189 rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv)
190 {
191    return _rrd_update(filename, tmplt, argc, argv, NULL);
192 }
193
194 int
195 _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv, 
196    info_t *pcdp_summary)
197 {
198
199     int              arg_i = 2;
200     short            j;
201     unsigned long    i,ii,iii=1;
202
203     unsigned long    rra_begin;          /* byte pointer to the rra
204                                           * area in the rrd file.  this
205                                           * pointer never changes value */
206     unsigned long    rra_start;          /* byte pointer to the rra
207                                           * area in the rrd file.  this
208                                           * pointer changes as each rrd is
209                                           * processed. */
210     unsigned long    rra_current;        /* byte pointer to the current write
211                                           * spot in the rrd file. */
212     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
213     double           interval,
214                      pre_int,post_int;   /* interval between this and
215                                           * the last run */
216     unsigned long    proc_pdp_st;        /* which pdp_st was the last
217                                           * to be processed */
218     unsigned long    occu_pdp_st;        /* when was the pdp_st
219                                           * before the last update
220                                           * time */
221     unsigned long    proc_pdp_age;       /* how old was the data in
222                                           * the pdp prep area when it
223                                           * was last updated */
224     unsigned long    occu_pdp_age;       /* how long ago was the last
225                                           * pdp_step time */
226     rrd_value_t      *pdp_new;           /* prepare the incoming data
227                                           * to be added the the
228                                           * existing entry */
229     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
230                                           * to be added the the
231                                           * cdp values */
232
233     long             *tmpl_idx;          /* index representing the settings
234                                             transported by the tmplt index */
235     unsigned long    tmpl_cnt = 2;       /* time and data */
236
237     rrd_t            rrd;
238     time_t           current_time = 0;
239     time_t           rra_time = 0;       /* time of update for a RRA */
240     unsigned long    current_time_usec=0;/* microseconds part of current time */
241     struct timeval   tmp_time;           /* used for time conversion */
242
243     char             **updvals;
244     int              schedule_smooth = 0;
245         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
246                                          /* a vector of future Holt-Winters seasonal coefs */
247     unsigned long    elapsed_pdp_st;
248                                          /* number of elapsed PDP steps since last update */
249     unsigned long    *rra_step_cnt = NULL;
250                                          /* number of rows to be updated in an RRA for a data
251                                           * value. */
252     unsigned long    start_pdp_offset;
253                                          /* number of PDP steps since the last update that
254                                           * are assigned to the first CDP to be generated
255                                           * since the last update. */
256     unsigned short   scratch_idx;
257                                          /* index into the CDP scratch array */
258     enum cf_en       current_cf;
259                                          /* numeric id of the current consolidation function */
260     rpnstack_t       rpnstack; /* used for COMPUTE DS */
261     int              version;  /* rrd version */
262     char             *endptr; /* used in the conversion */
263     rrd_file_t*      rrd_file;
264
265     rpnstack_init(&rpnstack);
266
267     /* need at least 1 arguments: data. */
268     if (argc < 1) {
269         rrd_set_error("Not enough arguments");
270         return -1;
271     }
272
273     rrd_file = rrd_open(filename,&rrd, RRD_READWRITE);
274     if (rrd_file == NULL) {
275         return -1;
276     }
277
278     /* initialize time */
279     version = atoi(rrd.stat_head->version);
280     gettimeofday(&tmp_time, 0);
281     normalize_time(&tmp_time);
282     current_time = tmp_time.tv_sec;
283     if(version >= 3) {
284         current_time_usec = tmp_time.tv_usec;
285     }
286     else {
287         current_time_usec = 0;
288     }
289
290     rra_current = rra_start = rra_begin = rrd_file->header_len;
291     /* This is defined in the ANSI C standard, section 7.9.5.3:
292
293         When a file is opened with udpate mode ('+' as the second
294         or third character in the ... list of mode argument
295         variables), both input and output may be performed on the
296         associated stream.  However, ...  input may not be directly
297         followed by output without an intervening call to a file
298         positioning function, unless the input operation encounters
299         end-of-file. */
300 #if 0//def HAVE_MMAP
301 rrd_filesize = rrd_file->file_size;
302     fseek(rrd_file->fd, 0, SEEK_END);
303     rrd_filesize = ftell(rrd_file->fd);
304     fseek(rrd_file->fd, rra_current, SEEK_SET);
305 #else
306 //    fseek(rrd_file->fd, 0, SEEK_CUR);
307 #endif
308
309     
310     /* get exclusive lock to whole file.
311      * lock gets removed when we close the file.
312      */
313     if (LockRRD(rrd_file->fd) != 0) {
314       rrd_set_error("could not lock RRD");
315       rrd_free(&rrd);
316       close(rrd_file->fd);
317       return(-1);   
318     } 
319
320     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
321         rrd_set_error("allocating updvals pointer array");
322         rrd_free(&rrd);
323         close(rrd_file->fd);
324         return(-1);
325     }
326
327     if ((pdp_temp = malloc(sizeof(rrd_value_t)
328                            *rrd.stat_head->ds_cnt))==NULL){
329         rrd_set_error("allocating pdp_temp ...");
330         free(updvals);
331         rrd_free(&rrd);
332         close(rrd_file->fd);
333         return(-1);
334     }
335
336     if ((tmpl_idx = malloc(sizeof(unsigned long)
337                            *(rrd.stat_head->ds_cnt+1)))==NULL){
338         rrd_set_error("allocating tmpl_idx ...");
339         free(pdp_temp);
340         free(updvals);
341         rrd_free(&rrd);
342         close(rrd_file->fd);
343         return(-1);
344     }
345     /* initialize tmplt redirector */
346     /* default config example (assume DS 1 is a CDEF DS)
347        tmpl_idx[0] -> 0; (time)
348        tmpl_idx[1] -> 1; (DS 0)
349        tmpl_idx[2] -> 3; (DS 2)
350        tmpl_idx[3] -> 4; (DS 3) */
351     tmpl_idx[0] = 0; /* time */
352     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
353         {
354            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
355               tmpl_idx[ii++]=i;
356         }
357     tmpl_cnt= ii;
358
359     if (tmplt) {
360         /* we should work on a writeable copy here */
361         char *dsname;
362         unsigned int tmpl_len;
363         char *tmplt_copy = strdup(tmplt);
364         dsname = tmplt_copy;
365         tmpl_cnt = 1; /* the first entry is the time */
366         tmpl_len = strlen(tmplt_copy);
367         for(i=0;i<=tmpl_len ;i++) {
368             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
369                 tmplt_copy[i] = '\0';
370                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
371                     rrd_set_error("tmplt contains more DS definitions than RRD");
372                     free(updvals); free(pdp_temp);
373                     free(tmpl_idx); rrd_free(&rrd);
374                     close(rrd_file->fd); return(-1);
375                 }
376                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
377                     rrd_set_error("unknown DS name '%s'",dsname);
378                     free(updvals); free(pdp_temp);
379                     free(tmplt_copy);
380                     free(tmpl_idx); rrd_free(&rrd);
381                     close(rrd_file->fd); return(-1);
382                 } else {
383                   /* the first element is always the time */
384                   tmpl_idx[tmpl_cnt-1]++; 
385                   /* go to the next entry on the tmplt_copy */
386                   dsname = &tmplt_copy[i+1];
387                   /* fix the damage we did before */
388                   if (i<tmpl_len) {
389                      tmplt_copy[i]=':';
390                   } 
391
392                 }
393             }       
394         }
395         free(tmplt_copy);
396     }
397     if ((pdp_new = malloc(sizeof(rrd_value_t)
398                           *rrd.stat_head->ds_cnt))==NULL){
399         rrd_set_error("allocating pdp_new ...");
400         free(updvals);
401         free(pdp_temp);
402         free(tmpl_idx);
403         rrd_free(&rrd);
404         close(rrd_file->fd);
405         return(-1);
406     }
407
408 #if 0//def HAVE_MMAP
409     rrd_mmaped_file = mmap(0, 
410                         rrd_file->file_len, 
411                         PROT_READ | PROT_WRITE, 
412                         MAP_SHARED, 
413                         fileno(in_file), 
414                         0);
415     if (rrd_mmaped_file == MAP_FAILED) {
416         rrd_set_error("error mmapping file %s", filename);
417         free(updvals);
418         free(pdp_temp);
419         free(tmpl_idx);
420         rrd_free(&rrd);
421         close(rrd_file->fd);
422         return(-1);
423     }
424 #ifdef USE_MADVISE
425     /* when we use mmaping we tell the kernel the mmap equivalent
426        of POSIX_FADV_RANDOM */
427     madvise(rrd_mmaped_file,rrd_filesize,POSIX_MADV_RANDOM);
428 #endif
429 #endif
430     /* loop through the arguments. */
431     for(arg_i=0; arg_i<argc;arg_i++) {
432         char *stepper = strdup(argv[arg_i]);
433         char *step_start = stepper;
434         char *p;
435         char *parsetime_error = NULL;
436         enum {atstyle, normal} timesyntax;
437         struct rrd_time_value ds_tv;
438         if (stepper == NULL){
439                 rrd_set_error("failed duplication argv entry");
440                 free(step_start);
441                 free(updvals);
442                 free(pdp_temp);  
443                 free(tmpl_idx);
444                 rrd_free(&rrd);
445 #ifdef HAVE_MMAP
446                 rrd_close(rrd_file);
447 #endif
448                 close(rrd_file->fd);
449                 return(-1);
450          }
451         /* initialize all ds input to unknown except the first one
452            which has always got to be set */
453         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
454         updvals[0]=stepper;
455         /* separate all ds elements; first must be examined separately
456            due to alternate time syntax */
457         if ((p=strchr(stepper,'@'))!=NULL) {
458             timesyntax = atstyle;
459             *p = '\0';
460             stepper = p+1;
461         } else if ((p=strchr(stepper,':'))!=NULL) {
462             timesyntax = normal;
463             *p = '\0';
464             stepper = p+1;
465         } else {
466             rrd_set_error("expected timestamp not found in data source from %s",
467                           argv[arg_i]);
468             free(step_start);
469             break;
470         }
471         ii=1;
472         updvals[tmpl_idx[ii]] = stepper;
473         while (*stepper) {
474             if (*stepper == ':') {
475                 *stepper = '\0';
476                 ii++;
477                 if (ii<tmpl_cnt){                   
478                     updvals[tmpl_idx[ii]] = stepper+1;
479                 }
480             }
481             stepper++;
482         }
483
484         if (ii != tmpl_cnt-1) {
485             rrd_set_error("expected %lu data source readings (got %lu) from %s",
486                           tmpl_cnt-1, ii, argv[arg_i]);
487             free(step_start);
488             break;
489         }
490         
491         /* get the time from the reading ... handle N */
492         if (timesyntax == atstyle) {
493             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
494                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
495                 free(step_start);
496                 break;
497             }
498             if (ds_tv.type == RELATIVE_TO_END_TIME ||
499                 ds_tv.type == RELATIVE_TO_START_TIME) {
500                 rrd_set_error("specifying time relative to the 'start' "
501                               "or 'end' makes no sense here: %s",
502                               updvals[0]);
503                 free(step_start);
504                 break;
505             }
506
507             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
508             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
509             
510         } else if (strcmp(updvals[0],"N")==0){
511             gettimeofday(&tmp_time, 0);
512             normalize_time(&tmp_time);
513             current_time = tmp_time.tv_sec;
514             current_time_usec = tmp_time.tv_usec;
515         } else {
516             double tmp;
517             tmp = strtod(updvals[0], 0);
518             current_time = floor(tmp);
519             current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
520         }
521         /* dont do any correction for old version RRDs */
522         if(version < 3) 
523             current_time_usec = 0;
524         
525         if(current_time < rrd.live_head->last_up || 
526           (current_time == rrd.live_head->last_up && 
527            (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
528             rrd_set_error("illegal attempt to update using time %ld when "
529                           "last update time is %ld (minimum one second step)",
530                           current_time, rrd.live_head->last_up);
531             free(step_start);
532             break;
533         }
534         
535         
536         /* seek to the beginning of the rra's */
537         if (rra_current != rra_begin) {
538 #ifndef HAVE_MMAP
539             if(rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
540                 rrd_set_error("seek error in rrd");
541                 free(step_start);
542                 break;
543             }
544 #endif
545             rra_current = rra_begin;
546         }
547         rra_start = rra_begin;
548
549         /* when was the current pdp started */
550         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
551         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
552
553         /* when did the last pdp_st occur */
554         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
555         occu_pdp_st = current_time - occu_pdp_age;
556
557         /* interval = current_time - rrd.live_head->last_up; */
558         interval    = (double)(current_time - rrd.live_head->last_up) 
559                     + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
560
561         if (occu_pdp_st > proc_pdp_st){
562             /* OK we passed the pdp_st moment*/
563             pre_int =  (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
564                                                               * occurred before the latest
565                                                               * pdp_st moment*/
566             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
567             post_int = occu_pdp_age;                         /* how much after it */
568             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
569         } else {
570             pre_int = interval;
571             post_int = 0;
572         }
573
574 #ifdef DEBUG
575         printf(
576                "proc_pdp_age %lu\t"
577                "proc_pdp_st %lu\t" 
578                "occu_pfp_age %lu\t" 
579                "occu_pdp_st %lu\t"
580                "int %lf\t"
581                "pre_int %lf\t"
582                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
583                 occu_pdp_age, occu_pdp_st,
584                interval, pre_int, post_int);
585 #endif
586     
587         /* process the data sources and update the pdp_prep 
588          * area accordingly */
589         for(i=0;i<rrd.stat_head->ds_cnt;i++){
590             enum dst_en dst_idx;
591             dst_idx= dst_conv(rrd.ds_def[i].dst);
592
593             /* make sure we do not build diffs with old last_ds values */
594             if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
595                 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
596                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
597             }
598
599             /* NOTE: DST_CDEF should never enter this if block, because
600              * updvals[i+1][0] is initialized to 'U'; unless the caller
601              * accidently specified a value for the DST_CDEF. To handle 
602               * this case, an extra check is required. */
603
604             if((updvals[i+1][0] != 'U') &&
605                    (dst_idx != DST_CDEF) &&
606                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
607                double rate = DNAN;
608                /* the data source type defines how to process the data */
609                 /* pdp_new contains rate * time ... eg the bytes
610                  * transferred during the interval. Doing it this way saves
611                  * a lot of math operations */
612                 
613
614                 switch(dst_idx){
615                 case DST_COUNTER:
616                 case DST_DERIVE:
617                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
618                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
619                             if((updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9') && (ii != 0 && updvals[i+1][ii] != '-')){
620                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
621                                  break;
622                             }
623                        }
624                        if (rrd_test_error()){
625                             break;
626                        }
627                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
628                        if(dst_idx == DST_COUNTER) {
629                           /* simple overflow catcher suggested by Andres Kroonmaa */
630                           /* this will fail terribly for non 32 or 64 bit counters ... */
631                           /* are there any others in SNMP land ? */
632                           if (pdp_new[i] < (double)0.0 ) 
633                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
634                           if (pdp_new[i] < (double)0.0 ) 
635                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
636                        }
637                        rate = pdp_new[i] / interval;
638                     }
639                    else {
640                      pdp_new[i]= DNAN;          
641                    }
642                    break;
643                 case DST_ABSOLUTE:
644                     errno = 0;
645                     pdp_new[i] = strtod(updvals[i+1],&endptr);
646                     if (errno > 0){
647                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
648                         break;
649                     };
650                     if (endptr[0] != '\0'){
651                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
652                         break;
653                     }
654                     rate = pdp_new[i] / interval;                 
655                     break;
656                 case DST_GAUGE:
657                     errno = 0;
658                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
659                     if (errno > 0){
660                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
661                         break;
662                     };
663                     if (endptr[0] != '\0'){
664                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
665                         break;
666                     }
667                     rate = pdp_new[i] / interval;                  
668                     break;
669                 default:
670                     rrd_set_error("rrd contains unknown DS type : '%s'",
671                                   rrd.ds_def[i].dst);
672                     break;
673                 }
674                 /* break out of this for loop if the error string is set */
675                 if (rrd_test_error()){
676                     break;
677                 }
678                /* make sure pdp_temp is neither too large or too small
679                 * if any of these occur it becomes unknown ...
680                 * sorry folks ... */
681                if ( ! isnan(rate) && 
682                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
683                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
684                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
685                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
686                   pdp_new[i] = DNAN;
687                }               
688             } else {
689                 /* no news is news all the same */
690                 pdp_new[i] = DNAN;
691             }
692
693             
694             /* make a copy of the command line argument for the next run */
695 #ifdef DEBUG
696             fprintf(stderr,
697                     "prep ds[%lu]\t"
698                     "last_arg '%s'\t"
699                     "this_arg '%s'\t"
700                     "pdp_new %10.2f\n",
701                     i,
702                     rrd.pdp_prep[i].last_ds,
703                     updvals[i+1], pdp_new[i]);
704 #endif
705             strncpy(rrd.pdp_prep[i].last_ds, updvals[i+1],LAST_DS_LEN-1);
706             rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
707         }
708         /* break out of the argument parsing loop if the error_string is set */
709         if (rrd_test_error()){
710             free(step_start);
711             break;
712         }
713         /* has a pdp_st moment occurred since the last run ? */
714
715         if (proc_pdp_st == occu_pdp_st){
716             /* no we have not passed a pdp_st moment. therefore update is simple */
717
718             for(i=0;i<rrd.stat_head->ds_cnt;i++){
719                 if(isnan(pdp_new[i])) {            
720                     /* this is not realy accurate if we use subsecond data arival time
721                        should have thought of it when going subsecond resolution ...
722                        sorry next format change we will have it! */
723                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);          
724                 } else {
725                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
726                         rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
727                      } else {
728                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
729                      }
730                 }
731 #ifdef DEBUG
732                 fprintf(stderr,
733                         "NO PDP  ds[%lu]\t"
734                         "value %10.2f\t"
735                         "unkn_sec %5lu\n",
736                         i,
737                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
738                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
739 #endif
740             }   
741         } else {
742             /* an pdp_st has occurred. */
743
744             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
745              * occurred up to the last run.        
746             pdp_new[] contains rate*seconds from the latest run.
747             pdp_temp[] will contain the rate for cdp */
748
749             for(i=0;i<rrd.stat_head->ds_cnt;i++){
750                 /* update pdp_prep to the current pdp_st. */
751                 double pre_unknown = 0.0;               
752                 if(isnan(pdp_new[i]))
753                     /* a final bit of unkonwn to be added bevore calculation
754                      * we use a tempaorary variable for this so that we 
755                      * don't have to turn integer lines before using the value */                
756                     pre_unknown = pre_int;
757                 else {
758                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
759                         rrd.pdp_prep[i].scratch[PDP_val].u_val=         pdp_new[i]/interval*pre_int;
760                      } else {
761                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
762                      }
763                  }
764                 
765
766                 /* if too much of the pdp_prep is unknown we dump it */
767                 if ( 
768                     /* removed because this does not agree with the definition
769                        a heart beat can be unknown */
770                     /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
771                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
772                     /* if the interval is larger thatn mrhb we get NAN */
773                     (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
774                     (occu_pdp_st-proc_pdp_st <= 
775                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
776                     pdp_temp[i] = DNAN;
777                 } else {
778                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
779                         / ((double)(occu_pdp_st - proc_pdp_st
780                                     - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
781                             -pre_unknown);
782                 }
783
784                 /* process CDEF data sources; remember each CDEF DS can
785                  * only reference other DS with a lower index number */
786             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
787                    rpnp_t *rpnp;
788                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
789                    /* substitue data values for OP_VARIABLE nodes */
790                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
791                    {
792                           if (rpnp[ii].op == OP_VARIABLE) {
793                                  rpnp[ii].op = OP_NUMBER;
794                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
795                           }
796                    }
797                    /* run the rpn calculator */
798                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
799                           free(rpnp);
800                           break; /* exits the data sources pdp_temp loop */
801                    }
802                 }
803         
804                 /* make pdp_prep ready for the next run */
805                 if(isnan(pdp_new[i])){
806                     /* this is not realy accurate if we use subsecond data arival time
807                        should have thought of it when going subsecond resolution ...
808                        sorry next format change we will have it! */
809                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
810                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
811                 } else {
812                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
813                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
814                         pdp_new[i]/interval*post_int;
815                 }
816
817 #ifdef DEBUG
818                 fprintf(stderr,
819                         "PDP UPD ds[%lu]\t"
820                         "pdp_temp %10.2f\t"
821                         "new_prep %10.2f\t"
822                         "new_unkn_sec %5lu\n",
823                         i, pdp_temp[i],
824                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
825                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
826 #endif
827             }
828
829                 /* if there were errors during the last loop, bail out here */
830             if (rrd_test_error()){
831                free(step_start);
832                break;
833             }
834
835                 /* compute the number of elapsed pdp_st moments */
836                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
837 #ifdef DEBUG
838                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
839 #endif
840                 if (rra_step_cnt == NULL)
841                 {
842                    rra_step_cnt = (unsigned long *) 
843                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
844                 }
845
846             for(i = 0, rra_start = rra_begin;
847                 i < rrd.stat_head->rra_cnt;
848             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
849                 i++)
850                 {
851                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
852                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
853                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
854         if (start_pdp_offset <= elapsed_pdp_st) {
855            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
856                       rrd.rra_def[i].pdp_cnt + 1;
857             } else {
858                    rra_step_cnt[i] = 0;
859                 }
860
861                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
862                 {
863                    /* If this is a bulk update, we need to skip ahead in the seasonal
864                         * arrays so that they will be correct for the next observed value;
865                         * note that for the bulk update itself, no update will occur to
866                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
867                         * be set to DNAN. */
868            if (rra_step_cnt[i] > 2) 
869                    {
870                           /* skip update by resetting rra_step_cnt[i],
871                            * note that this is not data source specific; this is due
872                            * to the bulk update, not a DNAN value for the specific data
873                            * source. */
874                           rra_step_cnt[i] = 0;
875               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
876                              &last_seasonal_coef);
877                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
878                              &seasonal_coef);
879                    }
880                 
881                   /* periodically run a smoother for seasonal effects */
882                   /* Need to use first cdp parameter buffer to track
883                    * burnin (burnin requires a specific smoothing schedule).
884                    * The CDP_init_seasonal parameter is really an RRA level,
885                    * not a data source within RRA level parameter, but the rra_def
886                    * is read only for rrd_update (not flushed to disk). */
887                   iii = i*(rrd.stat_head -> ds_cnt);
888                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
889                           <= BURNIN_CYCLES)
890                   {
891                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
892                                  > rrd.rra_def[i].row_cnt - 1) {
893                            /* mark off one of the burnin cycles */
894                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
895                        schedule_smooth = 1;
896                          }  
897                   } else {
898                          /* someone has no doubt invented a trick to deal with this
899                           * wrap around, but at least this code is clear. */
900                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
901                              rrd.rra_ptr[i].cur_row)
902                          {
903                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
904                                   * mapping between PDP and CDP */
905                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
906                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
907                                  {
908 #ifdef DEBUG
909                                         fprintf(stderr,
910                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
911                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
912                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
913 #endif
914                                         schedule_smooth = 1;
915                                  }
916              } else {
917                                  /* can't rely on negative numbers because we are working with
918                                   * unsigned values */
919                                  /* Don't need modulus here. If we've wrapped more than once, only
920                                   * one smooth is executed at the end. */
921                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
922                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
923                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
924                                  {
925 #ifdef DEBUG
926                                         fprintf(stderr,
927                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
928                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
929                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
930 #endif
931                                         schedule_smooth = 1;
932                                  }
933                          }
934                   }
935
936               rra_current = rrd_tell(rrd_file);
937                 } /* if cf is DEVSEASONAL or SEASONAL */
938
939         if (rrd_test_error()) break;
940
941                     /* update CDP_PREP areas */
942                     /* loop over data soures within each RRA */
943                     for(ii = 0;
944                         ii < rrd.stat_head->ds_cnt;
945                         ii++)
946                         {
947                         
948                         /* iii indexes the CDP prep area for this data source within the RRA */
949                         iii=i*rrd.stat_head->ds_cnt+ii;
950
951                         if (rrd.rra_def[i].pdp_cnt > 1) {
952                           
953                            if (rra_step_cnt[i] > 0) {
954                            /* If we are in this block, as least 1 CDP value will be written to
955                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
956                                 * to be written, then the "fill in" value is the CDP_secondary_val
957                                 * entry. */
958                                   if (isnan(pdp_temp[ii]))
959                   {
960                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
961                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
962                                   } else {
963                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
964                                           * CDP data entries. No matter the CF, the value is the same because
965                                           * the average, max, min, and last of a list of identical values is
966                                           * the same, namely, the value itself. */
967                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
968                                   }
969                      
970                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
971                                       > rrd.rra_def[i].pdp_cnt*
972                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
973                                   {
974                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
975                                          /* initialize carry over */
976                                          if (current_cf == CF_AVERAGE) {
977                                                    if (isnan(pdp_temp[ii])) { 
978                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
979                                                    } else {
980                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
981                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
982                                                    }
983                                          } else {
984                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
985                                          }
986                                   } else {
987                                          rrd_value_t cum_val, cur_val; 
988                                      switch (current_cf) {
989                                                 case CF_AVERAGE:
990                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
991                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
992                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
993                                                (cum_val + cur_val * start_pdp_offset) /
994                                            (rrd.rra_def[i].pdp_cnt
995                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
996                                                    /* initialize carry over value */
997                                                    if (isnan(pdp_temp[ii])) { 
998                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
999                                                    } else {
1000                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1001                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1002                                                    }
1003                                                    break;
1004                                                 case CF_MAXIMUM:
1005                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1006                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1007 #ifdef DEBUG
1008                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1009                                                           isnan(pdp_temp[ii])) {
1010                                                      fprintf(stderr,
1011                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1012                                                                 i,ii);
1013                                                          exit(-1);
1014                                                   }
1015 #endif
1016                                                   if (cur_val > cum_val)
1017                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1018                                                   else
1019                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1020                                                   /* initialize carry over value */
1021                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1022                                                   break;
1023                                                 case CF_MINIMUM:
1024                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1025                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1026 #ifdef DEBUG
1027                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1028                                                           isnan(pdp_temp[ii])) {
1029                                                      fprintf(stderr,
1030                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1031                                                                 i,ii);
1032                                                          exit(-1);
1033                                                   }
1034 #endif
1035                                                   if (cur_val < cum_val)
1036                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1037                                                   else
1038                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1039                                                   /* initialize carry over value */
1040                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1041                                                   break;
1042                                                 case CF_LAST:
1043                                                 default:
1044                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1045                                                    /* initialize carry over value */
1046                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1047                                                 break;
1048                                          }
1049                                   } /* endif meets xff value requirement for a valid value */
1050                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1051                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1052                                   if (isnan(pdp_temp[ii]))
1053                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1054                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1055                                   else
1056                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1057                } else  /* rra_step_cnt[i]  == 0 */
1058                            {
1059 #ifdef DEBUG
1060                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1061                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1062                                          i,ii);
1063                                   } else {
1064                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1065                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1066                                   }
1067 #endif
1068                                   if (isnan(pdp_temp[ii])) {
1069                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1070                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1071                                   {
1072                                          if (current_cf == CF_AVERAGE) {
1073                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1074                                                    elapsed_pdp_st;
1075                                          } else {
1076                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1077                                          }
1078 #ifdef DEBUG
1079                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1080                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1081 #endif
1082                                   } else {
1083                                          switch (current_cf) {
1084                                          case CF_AVERAGE:
1085                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1086                                                    elapsed_pdp_st;
1087                                                 break;
1088                                          case CF_MINIMUM:
1089                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1090                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1091                                                 break; 
1092                                          case CF_MAXIMUM:
1093                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1094                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1095                                                 break; 
1096                                          case CF_LAST:
1097                                          default:
1098                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1099                                                 break;
1100                                          }
1101                                   }
1102                            }
1103                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1104                            if (elapsed_pdp_st > 2)
1105                            {
1106                                    switch (current_cf) {
1107                                    case CF_AVERAGE:
1108                                    default:
1109                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1110                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1111                                           break;
1112                    case CF_SEASONAL:
1113                                    case CF_DEVSEASONAL:
1114                                           /* need to update cached seasonal values, so they are consistent
1115                                            * with the bulk update */
1116                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1117                                            * CDP_last_deviation are the same. */
1118                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1119                                                  last_seasonal_coef[ii];
1120                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1121                                                  seasonal_coef[ii];
1122                                           break;
1123                    case CF_HWPREDICT:
1124                                           /* need to update the null_count and last_null_count.
1125                                            * even do this for non-DNAN pdp_temp because the
1126                                            * algorithm is not learning from batch updates. */
1127                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1128                                                  elapsed_pdp_st;
1129                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1130                                                  elapsed_pdp_st - 1;
1131                                           /* fall through */
1132                                    case CF_DEVPREDICT:
1133                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1134                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1135                                           break;
1136                    case CF_FAILURES:
1137                                           /* do not count missed bulk values as failures */
1138                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1139                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1140                                           /* need to reset violations buffer.
1141                                            * could do this more carefully, but for now, just
1142                                            * assume a bulk update wipes away all violations. */
1143                       erase_violations(&rrd, iii, i);
1144                                           break;
1145                                    }
1146                            } 
1147                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1148
1149                         if (rrd_test_error()) break;
1150
1151                         } /* endif data sources loop */
1152         } /* end RRA Loop */
1153
1154                 /* this loop is only entered if elapsed_pdp_st < 3 */
1155                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1156                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1157                 {
1158                for(i = 0, rra_start = rra_begin;
1159                    i < rrd.stat_head->rra_cnt;
1160                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1161                    i++)
1162                    {
1163                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1164
1165                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1166                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1167                           {
1168                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1169                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1170                                 &seasonal_coef);
1171                  rra_current = rrd_tell(rrd_file);
1172                           }
1173                           if (rrd_test_error()) break;
1174                       /* loop over data soures within each RRA */
1175                       for(ii = 0;
1176                           ii < rrd.stat_head->ds_cnt;
1177                           ii++)
1178                           {
1179                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1180                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1181                                     scratch_idx, seasonal_coef);
1182                           }
1183            } /* end RRA Loop */
1184                    if (rrd_test_error()) break;
1185             } /* end elapsed_pdp_st loop */
1186
1187                 if (rrd_test_error()) break;
1188
1189                 /* Ready to write to disk */
1190                 /* Move sequentially through the file, writing one RRA at a time.
1191                  * Note this architecture divorces the computation of CDP with
1192                  * flushing updated RRA entries to disk. */
1193             for(i = 0, rra_start = rra_begin;
1194                 i < rrd.stat_head->rra_cnt;
1195             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1196                 i++) {
1197                 /* is th5Aere anything to write for this RRA? If not, continue. */
1198         if (rra_step_cnt[i] == 0) continue;
1199
1200                 /* write the first row */
1201 #ifdef DEBUG
1202         fprintf(stderr,"  -- RRA Preseek %ld\n",rrd_file->pos);
1203 #endif
1204             rrd.rra_ptr[i].cur_row++;
1205             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1206                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1207                 /* positition on the first row */
1208                 rra_pos_tmp = rra_start +
1209                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1210                 if(rra_pos_tmp != rra_current) {
1211 #ifndef HAVE_MMAP
1212                    if(rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1213                       rrd_set_error("seek error in rrd");
1214                       break;
1215                    }
1216 #endif
1217                    rra_current = rra_pos_tmp;
1218                 }
1219
1220 #ifdef DEBUG
1221             fprintf(stderr,"  -- RRA Postseek %ld\n",rrd_file->pos);
1222 #endif
1223                 scratch_idx = CDP_primary_val;
1224                 if (pcdp_summary != NULL)
1225                 {
1226                    rra_time = (current_time - current_time 
1227                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1228                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1229                 }
1230 #ifdef HAVE_MMAP
1231                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file->fd, 
1232                    pcdp_summary, &rra_time, rrd_file->file_start);
1233 #else
1234                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file->fd, 
1235                    pcdp_summary, &rra_time);
1236 #endif
1237                 if (rrd_test_error()) break;
1238
1239                 /* write other rows of the bulk update, if any */
1240                 scratch_idx = CDP_secondary_val;
1241                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1242                 {
1243                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1244                    {
1245 #ifdef DEBUG
1246               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1247                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1248 #endif
1249                           /* wrap */
1250                           rrd.rra_ptr[i].cur_row = 0;
1251                           /* seek back to beginning of current rra */
1252                       if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0)
1253                           {
1254                          rrd_set_error("seek error in rrd");
1255                          break;
1256                           }
1257 #ifdef DEBUG
1258                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",rrd_file->pos);
1259 #endif
1260                           rra_current = rra_start;
1261                    }
1262                    if (pcdp_summary != NULL)
1263                    {
1264                       rra_time = (current_time - current_time 
1265                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1266                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1267                    }
1268 #ifdef HAVE_MMAP
1269                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file->fd,
1270                       pcdp_summary, &rra_time, rrd_file->file_start);
1271 #else
1272                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file->fd,
1273                       pcdp_summary, &rra_time);
1274 #endif
1275                 }
1276                 
1277                 if (rrd_test_error())
1278                   break;
1279                 } /* RRA LOOP */
1280
1281             /* break out of the argument parsing loop if error_string is set */
1282             if (rrd_test_error()){
1283                    free(step_start);
1284                    break;
1285             } 
1286             
1287         } /* endif a pdp_st has occurred */ 
1288         rrd.live_head->last_up = current_time;
1289         rrd.live_head->last_up_usec = current_time_usec; 
1290         free(step_start);
1291     } /* function argument loop */
1292
1293     if (seasonal_coef != NULL) free(seasonal_coef);
1294     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1295         if (rra_step_cnt != NULL) free(rra_step_cnt);
1296     rpnstack_free(&rpnstack);
1297
1298 #ifdef HAVE_MMAP
1299     if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1300             rrd_set_error("error writing(unmapping) file: %s", filename);
1301     }
1302 #endif    
1303     /* if we got here and if there is an error and if the file has not been
1304      * written to, then close things up and return. */
1305     if (rrd_test_error()) {
1306         free(updvals);
1307         free(tmpl_idx);
1308         rrd_free(&rrd);
1309         free(pdp_temp);
1310         free(pdp_new);
1311         close(rrd_file->fd);
1312         return(-1);
1313     }
1314
1315     /* aargh ... that was tough ... so many loops ... anyway, its done.
1316      * we just need to write back the live header portion now*/
1317
1318     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1319                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1320                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1321               SEEK_SET) != 0) {
1322         rrd_set_error("seek rrd for live header writeback");
1323         free(updvals);
1324         free(tmpl_idx);
1325         rrd_free(&rrd);
1326         free(pdp_temp);
1327         free(pdp_new);
1328         close(rrd_file->fd);
1329         return(-1);
1330     }
1331
1332     if(version >= 3) {
1333             if(rrd_write(rrd_file, rrd.live_head,
1334                        sizeof(live_head_t)*1) != sizeof(live_head_t)*1){
1335                 rrd_set_error("rrd_write live_head to rrd");
1336                 free(updvals);
1337                 rrd_free(&rrd);
1338                 free(tmpl_idx);
1339                 free(pdp_temp);
1340                 free(pdp_new);
1341                 close(rrd_file->fd);
1342                 return(-1);
1343             }
1344     }
1345     else {
1346             if(rrd_write(rrd_file, &rrd.live_head->last_up,
1347                        sizeof(time_t)*1) != sizeof(time_t)*1){
1348                 rrd_set_error("rrd_write live_head to rrd");
1349                 free(updvals);
1350                 rrd_free(&rrd);
1351                 free(tmpl_idx);
1352                 free(pdp_temp);
1353                 free(pdp_new);
1354                 close(rrd_file->fd);
1355                 return(-1);
1356             }
1357     }
1358             
1359
1360     if(rrd_write(rrd_file, rrd.pdp_prep,
1361                sizeof(pdp_prep_t)*rrd.stat_head->ds_cnt)
1362             != (ssize_t)(sizeof(pdp_prep_t)*rrd.stat_head->ds_cnt)){
1363         rrd_set_error("rrd_write pdp_prep to rrd");
1364         free(updvals);
1365         rrd_free(&rrd);
1366         free(tmpl_idx);
1367         free(pdp_temp);
1368         free(pdp_new);
1369         close(rrd_file->fd);
1370         return(-1);
1371     }
1372
1373     if(rrd_write(rrd_file, rrd.cdp_prep,
1374                sizeof(cdp_prep_t)*rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt)
1375        != (ssize_t)(sizeof(cdp_prep_t)*rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt)){
1376
1377         rrd_set_error("rrd_write cdp_prep to rrd");
1378         free(updvals);
1379         free(tmpl_idx);
1380         rrd_free(&rrd);
1381         free(pdp_temp);
1382         free(pdp_new);
1383         close(rrd_file->fd);
1384         return(-1);
1385     }
1386
1387     if(rrd_write(rrd_file, rrd.rra_ptr,
1388                sizeof(rra_ptr_t)* rrd.stat_head->rra_cnt)
1389             != (ssize_t)(sizeof(rra_ptr_t)*rrd.stat_head->rra_cnt)){
1390         rrd_set_error("rrd_write rra_ptr to rrd");
1391         free(updvals);
1392         free(tmpl_idx);
1393         rrd_free(&rrd);
1394         free(pdp_temp);
1395         free(pdp_new);
1396         close(rrd_file->fd);
1397         return(-1);
1398     }
1399     
1400 #ifdef HAVE_POSIX_FADVISExxx
1401
1402     /* with update we have write ops, so they will probably not be done by now, this means
1403        the buffers will not get freed. But calling this for the whole file - header
1404        will let the data off the hook as soon as it is written when if it is from a previous
1405        update cycle. Calling fdsync to force things is much too hard here. */
1406
1407     if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1408          rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s",filename, rrd_strerror(errno));
1409          close(rrd_file->fd);
1410          return(-1);
1411     } 
1412 #endif
1413     /*XXX: ? */rrd_flush(rrd_file);
1414
1415     /* calling the smoothing code here guarantees at most
1416          * one smoothing operation per rrd_update call. Unfortunately,
1417          * it is possible with bulk updates, or a long-delayed update
1418          * for smoothing to occur off-schedule. This really isn't
1419          * critical except during the burning cycles. */
1420         if (schedule_smooth)
1421         {
1422 //        in_file = fopen(filename,"rb+");
1423           
1424
1425           rra_start = rra_begin;
1426           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1427           {
1428             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1429                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1430             {
1431 #ifdef DEBUG
1432               fprintf(stderr,"Running smoother for rra %ld\n",i);
1433 #endif
1434               apply_smoother(&rrd,i,rra_start,rrd_file);
1435               if (rrd_test_error())
1436                 break;
1437             }
1438             rra_start += rrd.rra_def[i].row_cnt
1439               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1440           }
1441 #ifdef HAVE_POSIX_FADVISExxx
1442           /* same procedure as above ... */
1443           if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1444              rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s",filename, rrd_strerror(errno));
1445              close(rrd_file->fd);
1446              return(-1);
1447           } 
1448 #endif
1449           close(rrd_file->fd);
1450         }
1451
1452     /* OK now close the files and free the memory */
1453     if(close(rrd_file->fd) != 0){
1454         rrd_set_error("closing rrd");
1455         free(updvals);
1456         free(tmpl_idx);
1457         rrd_free(&rrd);
1458         free(pdp_temp);
1459         free(pdp_new);
1460         return(-1);
1461     }
1462
1463     rrd_free(&rrd);
1464     free(updvals);
1465     free(tmpl_idx);
1466     free(pdp_new);
1467     free(pdp_temp);
1468     return(0);
1469 }
1470
1471 /*
1472  * get exclusive lock to whole file.
1473  * lock gets removed when we close the file
1474  *
1475  * returns 0 on success
1476  */
1477 int
1478 LockRRD(int in_file)
1479 {
1480     int rcstat;
1481
1482         {
1483 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1484     struct _stat st;
1485
1486     if ( _fstat( in_file, &st ) == 0 ) {
1487             rcstat = _locking ( in_file, _LK_NBLCK, st.st_size );
1488     } else {
1489             rcstat = -1;
1490     }
1491 #else
1492     struct flock        lock;
1493     lock.l_type = F_WRLCK;    /* exclusive write lock */
1494     lock.l_len = 0;           /* whole file */
1495     lock.l_start = 0;         /* start of file */
1496     lock.l_whence = SEEK_SET;   /* end of file */
1497
1498     rcstat = fcntl(in_file, F_SETLK, &lock);
1499 #endif
1500         }
1501
1502     return(rcstat);
1503 }
1504
1505
1506 #ifdef HAVE_MMAP
1507 info_t
1508 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1509                unsigned short CDP_scratch_idx, 
1510 #ifndef DEBUG
1511 int UNUSED(in_file),
1512 #else
1513 int in_file,
1514 #endif
1515                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1516 #else
1517 info_t
1518 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1519                unsigned short CDP_scratch_idx, int in_file,
1520                    info_t *pcdp_summary, time_t *rra_time)
1521 #endif
1522 {
1523    unsigned long ds_idx, cdp_idx;
1524    infoval iv;
1525   
1526    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1527    {
1528       /* compute the cdp index */
1529       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1530 #ifdef DEBUG
1531           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1532              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,rrd_file->pos,
1533              rrd -> rra_def[rra_idx].cf_nam);
1534 #endif 
1535       if (pcdp_summary != NULL)
1536           {
1537              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1538              /* append info to the return hash */
1539                  pcdp_summary = info_push(pcdp_summary,
1540                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1541                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1542                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1543          RD_I_VAL, iv);
1544           }
1545 #ifdef HAVE_MMAP
1546           memcpy((char *)rrd_mmaped_file + *rra_current,
1547                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1548                           sizeof(rrd_value_t));
1549 #else
1550           if(rrd_write(rrd_file,&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1551                  sizeof(rrd_value_t)*1) != sizeof(rrd_value_t)*1)
1552           { 
1553              rrd_set_error("writing rrd");
1554              return 0;
1555           }
1556 #endif
1557           *rra_current += sizeof(rrd_value_t);
1558         }
1559         return (pcdp_summary);
1560 }