fix rounding issues that prevented UNKNOWN to work as soon as subsecond resolution...
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.11  Copyright by Tobi Oetiker, 1997-2005
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13  #include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 struct timeval {
36         time_t tv_sec; /* seconds */
37         long tv_usec;  /* microseconds */
38 };
39
40 struct __timezone {
41         int  tz_minuteswest; /* minutes W of Greenwich */
42         int  tz_dsttime;     /* type of dst correction */
43 };
44
45 static gettimeofday(struct timeval *t, struct __timezone *tz) {
46         
47         struct timeb current_time;
48
49         _ftime(&current_time);
50         
51         t->tv_sec  = current_time.time;
52         t->tv_usec = current_time.millitm * 1000;
53 }
54
55 #endif
56 /*
57  * normilize time as returned by gettimeofday. usec part must
58  * be always >= 0
59  */
60 static void normalize_time(struct timeval *t)
61 {
62         if(t->tv_usec < 0) {
63                 t->tv_sec--;
64                 t->tv_usec += 1000000L;
65         }
66 }
67
68 /* Local prototypes */
69 int LockRRD(FILE *rrd_file);
70 #ifdef HAVE_MMAP
71 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
72                                         unsigned long *rra_current,
73                                         unsigned short CDP_scratch_idx,
74 #ifndef DEBUG
75 FILE UNUSED(*rrd_file),
76 #else
77 FILE *rrd_file,
78 #endif
79                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
80 #else
81 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
82                                         unsigned long *rra_current,
83                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
84                                         info_t *pcdp_summary, time_t *rra_time);
85 #endif
86 int rrd_update_r(char *filename, char *template, int argc, char **argv);
87 int _rrd_update(char *filename, char *template, int argc, char **argv, 
88                                         info_t*);
89
90 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
91
92
93 #ifdef STANDALONE
94 int 
95 main(int argc, char **argv){
96         rrd_update(argc,argv);
97         if (rrd_test_error()) {
98                 printf("RRDtool " PACKAGE_VERSION "  Copyright by Tobi Oetiker, 1997-2005\n\n"
99                         "Usage: rrdupdate filename\n"
100                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
101                         "\t\t\ttime|N:value[:value...]\n\n"
102                         "\t\t\tat-time@value[:value...]\n\n"
103                         "\t\t\t[ time:value[:value...] ..]\n\n");
104                                    
105                 printf("ERROR: %s\n",rrd_get_error());
106                 rrd_clear_error();                                                            
107                 return 1;
108         }
109         return 0;
110 }
111 #endif
112
113 info_t *rrd_update_v(int argc, char **argv)
114 {
115     char             *template = NULL;          
116         info_t *result = NULL;
117         infoval rc;
118     optind = 0; opterr = 0;  /* initialize getopt */
119
120     while (1) {
121                 static struct option long_options[] =
122                         {
123                                 {"template",      required_argument, 0, 't'},
124                                 {0,0,0,0}
125                         };
126                 int option_index = 0;
127                 int opt;
128                 opt = getopt_long(argc, argv, "t:", 
129                                                   long_options, &option_index);
130                 
131                 if (opt == EOF)
132                         break;
133                 
134                 switch(opt) {
135                 case 't':
136                         template = optarg;
137                         break;
138                 
139                 case '?':
140                         rrd_set_error("unknown option '%s'",argv[optind-1]);
141             rc.u_int = -1;
142                         goto end_tag;
143                 }
144     }
145
146     /* need at least 2 arguments: filename, data. */
147     if (argc-optind < 2) {
148                 rrd_set_error("Not enough arguments");
149         rc.u_int = -1;
150                 goto end_tag;
151     }
152     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
153         rc.u_int = _rrd_update(argv[optind], template,
154                       argc - optind - 1, argv + optind + 1, result);
155     result->value.u_int = rc.u_int;
156 end_tag:
157     return result;
158 }
159
160 int
161 rrd_update(int argc, char **argv)
162 {
163     char             *template = NULL;          
164     int rc;
165     optind = 0; opterr = 0;  /* initialize getopt */
166
167     while (1) {
168                 static struct option long_options[] =
169                         {
170                                 {"template",      required_argument, 0, 't'},
171                                 {0,0,0,0}
172                         };
173                 int option_index = 0;
174                 int opt;
175                 opt = getopt_long(argc, argv, "t:", 
176                                                   long_options, &option_index);
177                 
178                 if (opt == EOF)
179                         break;
180                 
181                 switch(opt) {
182                 case 't':
183                         template = optarg;
184                         break;
185                 
186                 case '?':
187                         rrd_set_error("unknown option '%s'",argv[optind-1]);
188                         return(-1);
189                 }
190     }
191
192     /* need at least 2 arguments: filename, data. */
193     if (argc-optind < 2) {
194                 rrd_set_error("Not enough arguments");
195
196                 return -1;
197     }
198  
199         rc = rrd_update_r(argv[optind], template,
200                       argc - optind - 1, argv + optind + 1);
201     return rc;
202 }
203
204 int
205 rrd_update_r(char *filename, char *template, int argc, char **argv)
206 {
207    return _rrd_update(filename, template, argc, argv, NULL);
208 }
209
210 int
211 _rrd_update(char *filename, char *template, int argc, char **argv, 
212    info_t *pcdp_summary)
213 {
214
215     int              arg_i = 2;
216     short            j;
217     unsigned long    i,ii,iii=1;
218
219     unsigned long    rra_begin;          /* byte pointer to the rra
220                                           * area in the rrd file.  this
221                                           * pointer never changes value */
222     unsigned long    rra_start;          /* byte pointer to the rra
223                                           * area in the rrd file.  this
224                                           * pointer changes as each rrd is
225                                           * processed. */
226     unsigned long    rra_current;        /* byte pointer to the current write
227                                           * spot in the rrd file. */
228     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
229     double           interval,
230         pre_int,post_int;                /* interval between this and
231                                           * the last run */
232     unsigned long    proc_pdp_st;        /* which pdp_st was the last
233                                           * to be processed */
234     unsigned long    occu_pdp_st;        /* when was the pdp_st
235                                           * before the last update
236                                           * time */
237     unsigned long    proc_pdp_age;       /* how old was the data in
238                                           * the pdp prep area when it
239                                           * was last updated */
240     unsigned long    occu_pdp_age;       /* how long ago was the last
241                                           * pdp_step time */
242     rrd_value_t      *pdp_new;           /* prepare the incoming data
243                                           * to be added the the
244                                           * existing entry */
245     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
246                                           * to be added the the
247                                           * cdp values */
248
249     long             *tmpl_idx;          /* index representing the settings
250                                             transported by the template index */
251     unsigned long    tmpl_cnt = 2;       /* time and data */
252
253     FILE             *rrd_file;
254     rrd_t            rrd;
255     time_t           current_time = 0;
256     time_t           rra_time = 0;       /* time of update for a RRA */
257     unsigned long    current_time_usec=0;/* microseconds part of current time */
258     struct timeval   tmp_time;           /* used for time conversion */
259
260     char             **updvals;
261     int              schedule_smooth = 0;
262         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
263                                          /* a vector of future Holt-Winters seasonal coefs */
264     unsigned long    elapsed_pdp_st;
265                                          /* number of elapsed PDP steps since last update */
266     unsigned long    *rra_step_cnt = NULL;
267                                          /* number of rows to be updated in an RRA for a data
268                                           * value. */
269     unsigned long    start_pdp_offset;
270                                          /* number of PDP steps since the last update that
271                                           * are assigned to the first CDP to be generated
272                                           * since the last update. */
273     unsigned short   scratch_idx;
274                                          /* index into the CDP scratch array */
275     enum cf_en       current_cf;
276                                          /* numeric id of the current consolidation function */
277     rpnstack_t       rpnstack; /* used for COMPUTE DS */
278     int              version;  /* rrd version */
279     char             *endptr; /* used in the conversion */
280 #ifdef HAVE_MMAP
281     void             *rrd_mmaped_file;
282     unsigned long    rrd_filesize;
283 #endif
284
285     rpnstack_init(&rpnstack);
286
287     /* need at least 1 arguments: data. */
288     if (argc < 1) {
289         rrd_set_error("Not enough arguments");
290         return -1;
291     }
292     
293     
294
295     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
296         return -1;
297     }
298     /* initialize time */
299     version = atoi(rrd.stat_head->version);
300     gettimeofday(&tmp_time, 0);
301     normalize_time(&tmp_time);
302     current_time = tmp_time.tv_sec;
303     if(version >= 3) {
304         current_time_usec = tmp_time.tv_usec;
305     }
306     else {
307         current_time_usec = 0;
308     }
309
310     rra_current = rra_start = rra_begin = ftell(rrd_file);
311     /* This is defined in the ANSI C standard, section 7.9.5.3:
312
313         When a file is opened with udpate mode ('+' as the second
314         or third character in the ... list of mode argument
315         variables), both input and ouptut may be performed on the
316         associated stream.  However, ...  input may not be directly
317         followed by output without an intervening call to a file
318         positioning function, unless the input oepration encounters
319         end-of-file. */
320 #ifdef HAVE_MMAP
321     fseek(rrd_file, 0, SEEK_END);
322     rrd_filesize = ftell(rrd_file);
323     fseek(rrd_file, rra_current, SEEK_SET);
324 #else
325     fseek(rrd_file, 0, SEEK_CUR);
326 #endif
327
328     
329     /* get exclusive lock to whole file.
330      * lock gets removed when we close the file.
331      */
332     if (LockRRD(rrd_file) != 0) {
333       rrd_set_error("could not lock RRD");
334       rrd_free(&rrd);
335       fclose(rrd_file);
336       return(-1);   
337     } 
338
339     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
340         rrd_set_error("allocating updvals pointer array");
341         rrd_free(&rrd);
342         fclose(rrd_file);
343         return(-1);
344     }
345
346     if ((pdp_temp = malloc(sizeof(rrd_value_t)
347                            *rrd.stat_head->ds_cnt))==NULL){
348         rrd_set_error("allocating pdp_temp ...");
349         free(updvals);
350         rrd_free(&rrd);
351         fclose(rrd_file);
352         return(-1);
353     }
354
355     if ((tmpl_idx = malloc(sizeof(unsigned long)
356                            *(rrd.stat_head->ds_cnt+1)))==NULL){
357         rrd_set_error("allocating tmpl_idx ...");
358         free(pdp_temp);
359         free(updvals);
360         rrd_free(&rrd);
361         fclose(rrd_file);
362         return(-1);
363     }
364     /* initialize template redirector */
365     /* default config example (assume DS 1 is a CDEF DS)
366        tmpl_idx[0] -> 0; (time)
367        tmpl_idx[1] -> 1; (DS 0)
368        tmpl_idx[2] -> 3; (DS 2)
369        tmpl_idx[3] -> 4; (DS 3) */
370     tmpl_idx[0] = 0; /* time */
371     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
372         {
373            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
374               tmpl_idx[ii++]=i;
375         }
376     tmpl_cnt= ii;
377
378     if (template) {
379         char *dsname;
380         unsigned int tmpl_len;
381         dsname = template;
382         tmpl_cnt = 1; /* the first entry is the time */
383         tmpl_len = strlen(template);
384         for(i=0;i<=tmpl_len ;i++) {
385             if (template[i] == ':' || template[i] == '\0') {
386                 template[i] = '\0';
387                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
388                     rrd_set_error("Template contains more DS definitions than RRD");
389                     free(updvals); free(pdp_temp);
390                     free(tmpl_idx); rrd_free(&rrd);
391                     fclose(rrd_file); return(-1);
392                 }
393                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
394                     rrd_set_error("unknown DS name '%s'",dsname);
395                     free(updvals); free(pdp_temp);
396                     free(tmpl_idx); rrd_free(&rrd);
397                     fclose(rrd_file); return(-1);
398                 } else {
399                   /* the first element is always the time */
400                   tmpl_idx[tmpl_cnt-1]++; 
401                   /* go to the next entry on the template */
402                   dsname = &template[i+1];
403                   /* fix the damage we did before */
404                   if (i<tmpl_len) {
405                      template[i]=':';
406                   } 
407
408                 }
409             }       
410         }
411     }
412     if ((pdp_new = malloc(sizeof(rrd_value_t)
413                           *rrd.stat_head->ds_cnt))==NULL){
414         rrd_set_error("allocating pdp_new ...");
415         free(updvals);
416         free(pdp_temp);
417         free(tmpl_idx);
418         rrd_free(&rrd);
419         fclose(rrd_file);
420         return(-1);
421     }
422
423 #ifdef HAVE_MMAP
424     rrd_mmaped_file = mmap(0, 
425                         rrd_filesize, 
426                         PROT_READ | PROT_WRITE, 
427                         MAP_SHARED, 
428                         fileno(rrd_file), 
429                         0);
430     if (rrd_mmaped_file == MAP_FAILED) {
431         rrd_set_error("error mmapping file %s", filename);
432         free(updvals);
433         free(pdp_temp);
434         free(tmpl_idx);
435         rrd_free(&rrd);
436         fclose(rrd_file);
437         return(-1);
438     }
439 #endif
440     /* loop through the arguments. */
441     for(arg_i=0; arg_i<argc;arg_i++) {
442         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
443         char *step_start = stepper;
444         char *p;
445         char *parsetime_error = NULL;
446         enum {atstyle, normal} timesyntax;
447         struct rrd_time_value ds_tv;
448         if (stepper == NULL){
449                 rrd_set_error("failed duplication argv entry");
450                 free(updvals);
451                 free(pdp_temp);  
452                 free(tmpl_idx);
453                 rrd_free(&rrd);
454 #ifdef HAVE_MMAP
455                 munmap(rrd_mmaped_file, rrd_filesize);
456 #endif
457                 fclose(rrd_file);
458                 return(-1);
459          }
460         /* initialize all ds input to unknown except the first one
461            which has always got to be set */
462         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
463         strcpy(stepper,argv[arg_i]);
464         updvals[0]=stepper;
465         /* separate all ds elements; first must be examined separately
466            due to alternate time syntax */
467         if ((p=strchr(stepper,'@'))!=NULL) {
468             timesyntax = atstyle;
469             *p = '\0';
470             stepper = p+1;
471         } else if ((p=strchr(stepper,':'))!=NULL) {
472             timesyntax = normal;
473             *p = '\0';
474             stepper = p+1;
475         } else {
476             rrd_set_error("expected timestamp not found in data source from %s:...",
477                           argv[arg_i]);
478             free(step_start);
479             break;
480         }
481         ii=1;
482         updvals[tmpl_idx[ii]] = stepper;
483         while (*stepper) {
484             if (*stepper == ':') {
485                 *stepper = '\0';
486                 ii++;
487                 if (ii<tmpl_cnt){                   
488                     updvals[tmpl_idx[ii]] = stepper+1;
489                 }
490             }
491             stepper++;
492         }
493
494         if (ii != tmpl_cnt-1) {
495             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
496                           tmpl_cnt-1, ii, argv[arg_i]);
497             free(step_start);
498             break;
499         }
500         
501         /* get the time from the reading ... handle N */
502         if (timesyntax == atstyle) {
503             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
504                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
505                 free(step_start);
506                 break;
507             }
508             if (ds_tv.type == RELATIVE_TO_END_TIME ||
509                 ds_tv.type == RELATIVE_TO_START_TIME) {
510                 rrd_set_error("specifying time relative to the 'start' "
511                               "or 'end' makes no sense here: %s",
512                               updvals[0]);
513                 free(step_start);
514                 break;
515             }
516
517             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
518             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
519             
520         } else if (strcmp(updvals[0],"N")==0){
521             gettimeofday(&tmp_time, 0);
522             normalize_time(&tmp_time);
523             current_time = tmp_time.tv_sec;
524             current_time_usec = tmp_time.tv_usec;
525         } else {
526             double tmp;
527             tmp = strtod(updvals[0], 0);
528             current_time = floor(tmp);
529             current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
530         }
531         /* dont do any correction for old version RRDs */
532         if(version < 3) 
533             current_time_usec = 0;
534         
535         if(current_time < rrd.live_head->last_up || 
536           (current_time == rrd.live_head->last_up && 
537            (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
538             rrd_set_error("illegal attempt to update using time %ld when "
539                           "last update time is %ld (minimum one second step)",
540                           current_time, rrd.live_head->last_up);
541             free(step_start);
542             break;
543         }
544         
545         
546         /* seek to the beginning of the rra's */
547         if (rra_current != rra_begin) {
548 #ifndef HAVE_MMAP
549             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
550                 rrd_set_error("seek error in rrd");
551                 free(step_start);
552                 break;
553             }
554 #endif
555             rra_current = rra_begin;
556         }
557         rra_start = rra_begin;
558
559         /* when was the current pdp started */
560         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
561         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
562
563         /* when did the last pdp_st occur */
564         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
565         occu_pdp_st = current_time - occu_pdp_age;
566
567         /* interval = current_time - rrd.live_head->last_up; */
568         interval    = (double)(current_time - rrd.live_head->last_up) 
569                     + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
570
571         if (occu_pdp_st > proc_pdp_st){
572             /* OK we passed the pdp_st moment*/
573             pre_int =  (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
574                                                               * occurred before the latest
575                                                               * pdp_st moment*/
576             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
577             post_int = occu_pdp_age;                         /* how much after it */
578             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
579         } else {
580             pre_int = interval;
581             post_int = 0;
582         }
583
584 #ifdef DEBUG
585         printf(
586                "proc_pdp_age %lu\t"
587                "proc_pdp_st %lu\t" 
588                "occu_pfp_age %lu\t" 
589                "occu_pdp_st %lu\t"
590                "int %lf\t"
591                "pre_int %lf\t"
592                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
593                 occu_pdp_age, occu_pdp_st,
594                interval, pre_int, post_int);
595 #endif
596     
597         /* process the data sources and update the pdp_prep 
598          * area accordingly */
599         for(i=0;i<rrd.stat_head->ds_cnt;i++){
600             enum dst_en dst_idx;
601             dst_idx= dst_conv(rrd.ds_def[i].dst);
602
603             /* make sure we do not build diffs with old last_ds values */
604             if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval 
605                 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
606                 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
607             }
608
609             /* NOTE: DST_CDEF should never enter this if block, because
610              * updvals[i+1][0] is initialized to 'U'; unless the caller
611              * accidently specified a value for the DST_CDEF. To handle 
612               * this case, an extra check is required. */
613
614             if((updvals[i+1][0] != 'U') &&
615                    (dst_idx != DST_CDEF) &&
616                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
617                double rate = DNAN;
618                /* the data source type defines how to process the data */
619                 /* pdp_new contains rate * time ... eg the bytes
620                  * transferred during the interval. Doing it this way saves
621                  * a lot of math operations */
622                 
623
624                 switch(dst_idx){
625                 case DST_COUNTER:
626                 case DST_DERIVE:
627                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
628                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
629                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
630                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
631                                  break;
632                             }
633                        }
634                        if (rrd_test_error()){
635                             break;
636                        }
637                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
638                        if(dst_idx == DST_COUNTER) {
639                           /* simple overflow catcher suggested by Andres Kroonmaa */
640                           /* this will fail terribly for non 32 or 64 bit counters ... */
641                           /* are there any others in SNMP land ? */
642                           if (pdp_new[i] < (double)0.0 ) 
643                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
644                           if (pdp_new[i] < (double)0.0 ) 
645                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
646                        }
647                        rate = pdp_new[i] / interval;
648                     }
649                    else {
650                      pdp_new[i]= DNAN;          
651                    }
652                    break;
653                 case DST_ABSOLUTE:
654                     errno = 0;
655                     pdp_new[i] = strtod(updvals[i+1],&endptr);
656                     if (errno > 0){
657                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
658                         break;
659                     };
660                     if (endptr[0] != '\0'){
661                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
662                         break;
663                     }
664                     rate = pdp_new[i] / interval;                 
665                     break;
666                 case DST_GAUGE:
667                     errno = 0;
668                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
669                     if (errno > 0){
670                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
671                         break;
672                     };
673                     if (endptr[0] != '\0'){
674                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
675                         break;
676                     }
677                     rate = pdp_new[i] / interval;                  
678                     break;
679                 default:
680                     rrd_set_error("rrd contains unknown DS type : '%s'",
681                                   rrd.ds_def[i].dst);
682                     break;
683                 }
684                 /* break out of this for loop if the error string is set */
685                 if (rrd_test_error()){
686                     break;
687                 }
688                /* make sure pdp_temp is neither too large or too small
689                 * if any of these occur it becomes unknown ...
690                 * sorry folks ... */
691                if ( ! isnan(rate) && 
692                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
693                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
694                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
695                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
696                   pdp_new[i] = DNAN;
697                }               
698             } else {
699                 /* no news is news all the same */
700                 pdp_new[i] = DNAN;
701             }
702             
703             /* make a copy of the command line argument for the next run */
704 #ifdef DEBUG
705             fprintf(stderr,
706                     "prep ds[%lu]\t"
707                     "last_arg '%s'\t"
708                     "this_arg '%s'\t"
709                     "pdp_new %10.2f\n",
710                     i,
711                     rrd.pdp_prep[i].last_ds,
712                     updvals[i+1], pdp_new[i]);
713 #endif
714             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
715                 strncpy(rrd.pdp_prep[i].last_ds,
716                         updvals[i+1],LAST_DS_LEN-1);
717                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
718             }
719         }
720         /* break out of the argument parsing loop if the error_string is set */
721         if (rrd_test_error()){
722             free(step_start);
723             break;
724         }
725         /* has a pdp_st moment occurred since the last run ? */
726
727         if (proc_pdp_st == occu_pdp_st){
728             /* no we have not passed a pdp_st moment. therefore update is simple */
729
730             for(i=0;i<rrd.stat_head->ds_cnt;i++){
731                 if(isnan(pdp_new[i]))
732                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval-0.5);
733                 else
734                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
735 #ifdef DEBUG
736                 fprintf(stderr,
737                         "NO PDP  ds[%lu]\t"
738                         "value %10.2f\t"
739                         "unkn_sec %5lu\n",
740                         i,
741                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
742                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
743 #endif
744             }   
745         } else {
746             /* an pdp_st has occurred. */
747
748             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
749              * occurred up to the last run.        
750             pdp_new[] contains rate*seconds from the latest run.
751             pdp_temp[] will contain the rate for cdp */
752
753             for(i=0;i<rrd.stat_head->ds_cnt;i++){
754                 /* update pdp_prep to the current pdp_st */
755                 if(isnan(pdp_new[i]))
756                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(pre_int+0.5);
757                 else
758                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
759                         pdp_new[i]/interval*pre_int;
760
761                 /* if too much of the pdp_prep is unknown we dump it */
762                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
763                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
764                     (occu_pdp_st-proc_pdp_st <= 
765                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
766                     pdp_temp[i] = DNAN;
767                 } else {
768                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
769                         / (double)( occu_pdp_st
770                                     - proc_pdp_st
771                                     - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
772                 }
773
774                 /* process CDEF data sources; remember each CDEF DS can
775                  * only reference other DS with a lower index number */
776             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
777                    rpnp_t *rpnp;
778                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
779                    /* substitue data values for OP_VARIABLE nodes */
780                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
781                    {
782                           if (rpnp[ii].op == OP_VARIABLE) {
783                                  rpnp[ii].op = OP_NUMBER;
784                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
785                           }
786                    }
787                    /* run the rpn calculator */
788                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
789                           free(rpnp);
790                           break; /* exits the data sources pdp_temp loop */
791                    }
792                 }
793         
794                 /* make pdp_prep ready for the next run */
795                 if(isnan(pdp_new[i])){
796                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int + 0.5);
797                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
798                 } else {
799                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
800                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
801                         pdp_new[i]/interval*post_int;
802                 }
803
804 #ifdef DEBUG
805                 fprintf(stderr,
806                         "PDP UPD ds[%lu]\t"
807                         "pdp_temp %10.2f\t"
808                         "new_prep %10.2f\t"
809                         "new_unkn_sec %5lu\n",
810                         i, pdp_temp[i],
811                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
812                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
813 #endif
814             }
815
816                 /* if there were errors during the last loop, bail out here */
817             if (rrd_test_error()){
818                free(step_start);
819                break;
820             }
821
822                 /* compute the number of elapsed pdp_st moments */
823                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
824 #ifdef DEBUG
825                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
826 #endif
827                 if (rra_step_cnt == NULL)
828                 {
829                    rra_step_cnt = (unsigned long *) 
830                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
831                 }
832
833             for(i = 0, rra_start = rra_begin;
834                 i < rrd.stat_head->rra_cnt;
835             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
836                 i++)
837                 {
838                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
839                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
840                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
841         if (start_pdp_offset <= elapsed_pdp_st) {
842            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
843                       rrd.rra_def[i].pdp_cnt + 1;
844             } else {
845                    rra_step_cnt[i] = 0;
846                 }
847
848                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
849                 {
850                    /* If this is a bulk update, we need to skip ahead in the seasonal
851                         * arrays so that they will be correct for the next observed value;
852                         * note that for the bulk update itself, no update will occur to
853                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
854                         * be set to DNAN. */
855            if (rra_step_cnt[i] > 2) 
856                    {
857                           /* skip update by resetting rra_step_cnt[i],
858                            * note that this is not data source specific; this is due
859                            * to the bulk update, not a DNAN value for the specific data
860                            * source. */
861                           rra_step_cnt[i] = 0;
862               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
863                              &last_seasonal_coef);
864                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
865                              &seasonal_coef);
866                    }
867                 
868                   /* periodically run a smoother for seasonal effects */
869                   /* Need to use first cdp parameter buffer to track
870                    * burnin (burnin requires a specific smoothing schedule).
871                    * The CDP_init_seasonal parameter is really an RRA level,
872                    * not a data source within RRA level parameter, but the rra_def
873                    * is read only for rrd_update (not flushed to disk). */
874                   iii = i*(rrd.stat_head -> ds_cnt);
875                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
876                           <= BURNIN_CYCLES)
877                   {
878                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
879                                  > rrd.rra_def[i].row_cnt - 1) {
880                            /* mark off one of the burnin cycles */
881                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
882                        schedule_smooth = 1;
883                          }  
884                   } else {
885                          /* someone has no doubt invented a trick to deal with this
886                           * wrap around, but at least this code is clear. */
887                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
888                              rrd.rra_ptr[i].cur_row)
889                          {
890                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
891                                   * mapping between PDP and CDP */
892                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
893                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
894                                  {
895 #ifdef DEBUG
896                                         fprintf(stderr,
897                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
898                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
899                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
900 #endif
901                                         schedule_smooth = 1;
902                                  }
903              } else {
904                                  /* can't rely on negative numbers because we are working with
905                                   * unsigned values */
906                                  /* Don't need modulus here. If we've wrapped more than once, only
907                                   * one smooth is executed at the end. */
908                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
909                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
910                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
911                                  {
912 #ifdef DEBUG
913                                         fprintf(stderr,
914                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
915                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
916                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
917 #endif
918                                         schedule_smooth = 1;
919                                  }
920                          }
921                   }
922
923               rra_current = ftell(rrd_file); 
924                 } /* if cf is DEVSEASONAL or SEASONAL */
925
926         if (rrd_test_error()) break;
927
928                     /* update CDP_PREP areas */
929                     /* loop over data soures within each RRA */
930                     for(ii = 0;
931                         ii < rrd.stat_head->ds_cnt;
932                         ii++)
933                         {
934                         
935                         /* iii indexes the CDP prep area for this data source within the RRA */
936                         iii=i*rrd.stat_head->ds_cnt+ii;
937
938                         if (rrd.rra_def[i].pdp_cnt > 1) {
939                           
940                            if (rra_step_cnt[i] > 0) {
941                            /* If we are in this block, as least 1 CDP value will be written to
942                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
943                                 * to be written, then the "fill in" value is the CDP_secondary_val
944                                 * entry. */
945                                   if (isnan(pdp_temp[ii]))
946                   {
947                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
948                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
949                                   } else {
950                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
951                                           * CDP data entries. No matter the CF, the value is the same because
952                                           * the average, max, min, and last of a list of identical values is
953                                           * the same, namely, the value itself. */
954                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
955                                   }
956                      
957                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
958                                       > rrd.rra_def[i].pdp_cnt*
959                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
960                                   {
961                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
962                                          /* initialize carry over */
963                                          if (current_cf == CF_AVERAGE) {
964                                                    if (isnan(pdp_temp[ii])) { 
965                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
966                                                    } else {
967                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
968                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
969                                                    }
970                                          } else {
971                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
972                                          }
973                                   } else {
974                                          rrd_value_t cum_val, cur_val; 
975                                      switch (current_cf) {
976                                                 case CF_AVERAGE:
977                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
978                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
979                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
980                                                (cum_val + cur_val * start_pdp_offset) /
981                                            (rrd.rra_def[i].pdp_cnt
982                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
983                                                    /* initialize carry over value */
984                                                    if (isnan(pdp_temp[ii])) { 
985                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
986                                                    } else {
987                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
988                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
989                                                    }
990                                                    break;
991                                                 case CF_MAXIMUM:
992                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
993                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
994 #ifdef DEBUG
995                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
996                                                           isnan(pdp_temp[ii])) {
997                                                      fprintf(stderr,
998                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
999                                                                 i,ii);
1000                                                          exit(-1);
1001                                                   }
1002 #endif
1003                                                   if (cur_val > cum_val)
1004                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1005                                                   else
1006                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1007                                                   /* initialize carry over value */
1008                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1009                                                   break;
1010                                                 case CF_MINIMUM:
1011                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1012                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1013 #ifdef DEBUG
1014                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1015                                                           isnan(pdp_temp[ii])) {
1016                                                      fprintf(stderr,
1017                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1018                                                                 i,ii);
1019                                                          exit(-1);
1020                                                   }
1021 #endif
1022                                                   if (cur_val < cum_val)
1023                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1024                                                   else
1025                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1026                                                   /* initialize carry over value */
1027                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1028                                                   break;
1029                                                 case CF_LAST:
1030                                                 default:
1031                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1032                                                    /* initialize carry over value */
1033                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1034                                                 break;
1035                                          }
1036                                   } /* endif meets xff value requirement for a valid value */
1037                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1038                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1039                                   if (isnan(pdp_temp[ii]))
1040                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1041                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1042                                   else
1043                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1044                } else  /* rra_step_cnt[i]  == 0 */
1045                            {
1046 #ifdef DEBUG
1047                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1048                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1049                                          i,ii);
1050                                   } else {
1051                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1052                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1053                                   }
1054 #endif
1055                                   if (isnan(pdp_temp[ii])) {
1056                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1057                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1058                                   {
1059                                          if (current_cf == CF_AVERAGE) {
1060                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1061                                                    elapsed_pdp_st;
1062                                          } else {
1063                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1064                                          }
1065 #ifdef DEBUG
1066                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1067                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1068 #endif
1069                                   } else {
1070                                          switch (current_cf) {
1071                                          case CF_AVERAGE:
1072                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1073                                                    elapsed_pdp_st;
1074                                                 break;
1075                                          case CF_MINIMUM:
1076                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1077                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1078                                                 break; 
1079                                          case CF_MAXIMUM:
1080                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1081                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1082                                                 break; 
1083                                          case CF_LAST:
1084                                          default:
1085                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1086                                                 break;
1087                                          }
1088                                   }
1089                            }
1090                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1091                            if (elapsed_pdp_st > 2)
1092                            {
1093                                    switch (current_cf) {
1094                                    case CF_AVERAGE:
1095                                    default:
1096                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1097                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1098                                           break;
1099                    case CF_SEASONAL:
1100                                    case CF_DEVSEASONAL:
1101                                           /* need to update cached seasonal values, so they are consistent
1102                                            * with the bulk update */
1103                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1104                                            * CDP_last_deviation are the same. */
1105                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1106                                                  last_seasonal_coef[ii];
1107                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1108                                                  seasonal_coef[ii];
1109                                           break;
1110                    case CF_HWPREDICT:
1111                                           /* need to update the null_count and last_null_count.
1112                                            * even do this for non-DNAN pdp_temp because the
1113                                            * algorithm is not learning from batch updates. */
1114                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1115                                                  elapsed_pdp_st;
1116                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1117                                                  elapsed_pdp_st - 1;
1118                                           /* fall through */
1119                                    case CF_DEVPREDICT:
1120                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1121                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1122                                           break;
1123                    case CF_FAILURES:
1124                                           /* do not count missed bulk values as failures */
1125                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1126                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1127                                           /* need to reset violations buffer.
1128                                            * could do this more carefully, but for now, just
1129                                            * assume a bulk update wipes away all violations. */
1130                       erase_violations(&rrd, iii, i);
1131                                           break;
1132                                    }
1133                            } 
1134                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1135
1136                         if (rrd_test_error()) break;
1137
1138                         } /* endif data sources loop */
1139         } /* end RRA Loop */
1140
1141                 /* this loop is only entered if elapsed_pdp_st < 3 */
1142                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1143                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1144                 {
1145                for(i = 0, rra_start = rra_begin;
1146                    i < rrd.stat_head->rra_cnt;
1147                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1148                    i++)
1149                    {
1150                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1151
1152                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1153                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1154                           {
1155                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1156                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1157                                 &seasonal_coef);
1158                  rra_current = ftell(rrd_file);
1159                           }
1160                           if (rrd_test_error()) break;
1161                       /* loop over data soures within each RRA */
1162                       for(ii = 0;
1163                           ii < rrd.stat_head->ds_cnt;
1164                           ii++)
1165                           {
1166                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1167                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1168                                     scratch_idx, seasonal_coef);
1169                           }
1170            } /* end RRA Loop */
1171                    if (rrd_test_error()) break;
1172             } /* end elapsed_pdp_st loop */
1173
1174                 if (rrd_test_error()) break;
1175
1176                 /* Ready to write to disk */
1177                 /* Move sequentially through the file, writing one RRA at a time.
1178                  * Note this architecture divorces the computation of CDP with
1179                  * flushing updated RRA entries to disk. */
1180             for(i = 0, rra_start = rra_begin;
1181                 i < rrd.stat_head->rra_cnt;
1182             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1183                 i++) {
1184                 /* is there anything to write for this RRA? If not, continue. */
1185         if (rra_step_cnt[i] == 0) continue;
1186
1187                 /* write the first row */
1188 #ifdef DEBUG
1189         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1190 #endif
1191             rrd.rra_ptr[i].cur_row++;
1192             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1193                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1194                 /* positition on the first row */
1195                 rra_pos_tmp = rra_start +
1196                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1197                 if(rra_pos_tmp != rra_current) {
1198 #ifndef HAVE_MMAP
1199                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1200                       rrd_set_error("seek error in rrd");
1201                       break;
1202                    }
1203 #endif
1204                    rra_current = rra_pos_tmp;
1205                 }
1206
1207 #ifdef DEBUG
1208             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1209 #endif
1210                 scratch_idx = CDP_primary_val;
1211                 if (pcdp_summary != NULL)
1212                 {
1213                    rra_time = (current_time - current_time 
1214                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1215                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1216                 }
1217 #ifdef HAVE_MMAP
1218                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1219                    pcdp_summary, &rra_time, rrd_mmaped_file);
1220 #else
1221                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1222                    pcdp_summary, &rra_time);
1223 #endif
1224                 if (rrd_test_error()) break;
1225
1226                 /* write other rows of the bulk update, if any */
1227                 scratch_idx = CDP_secondary_val;
1228                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1229                 {
1230                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1231                    {
1232 #ifdef DEBUG
1233               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1234                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1235 #endif
1236                           /* wrap */
1237                           rrd.rra_ptr[i].cur_row = 0;
1238                           /* seek back to beginning of current rra */
1239                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1240                           {
1241                          rrd_set_error("seek error in rrd");
1242                          break;
1243                           }
1244 #ifdef DEBUG
1245                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1246 #endif
1247                           rra_current = rra_start;
1248                    }
1249                    if (pcdp_summary != NULL)
1250                    {
1251                       rra_time = (current_time - current_time 
1252                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1253                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1254                    }
1255 #ifdef HAVE_MMAP
1256                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1257                       pcdp_summary, &rra_time, rrd_mmaped_file);
1258 #else
1259                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1260                       pcdp_summary, &rra_time);
1261 #endif
1262                 }
1263                 
1264                 if (rrd_test_error())
1265                   break;
1266                 } /* RRA LOOP */
1267
1268             /* break out of the argument parsing loop if error_string is set */
1269             if (rrd_test_error()){
1270                    free(step_start);
1271                    break;
1272             } 
1273             
1274         } /* endif a pdp_st has occurred */ 
1275         rrd.live_head->last_up = current_time;
1276         rrd.live_head->last_up_usec = current_time_usec; 
1277         free(step_start);
1278     } /* function argument loop */
1279
1280     if (seasonal_coef != NULL) free(seasonal_coef);
1281     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1282         if (rra_step_cnt != NULL) free(rra_step_cnt);
1283     rpnstack_free(&rpnstack);
1284
1285 #ifdef HAVE_MMAP
1286     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1287             rrd_set_error("error writing(unmapping) file: %s", filename);
1288     }
1289 #endif    
1290     /* if we got here and if there is an error and if the file has not been
1291      * written to, then close things up and return. */
1292     if (rrd_test_error()) {
1293         free(updvals);
1294         free(tmpl_idx);
1295         rrd_free(&rrd);
1296         free(pdp_temp);
1297         free(pdp_new);
1298         fclose(rrd_file);
1299         return(-1);
1300     }
1301
1302     /* aargh ... that was tough ... so many loops ... anyway, its done.
1303      * we just need to write back the live header portion now*/
1304
1305     if (fseek(rrd_file, (sizeof(stat_head_t)
1306                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1307                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1308               SEEK_SET) != 0) {
1309         rrd_set_error("seek rrd for live header writeback");
1310         free(updvals);
1311         free(tmpl_idx);
1312         rrd_free(&rrd);
1313         free(pdp_temp);
1314         free(pdp_new);
1315         fclose(rrd_file);
1316         return(-1);
1317     }
1318
1319     if(version >= 3) {
1320             if(fwrite( rrd.live_head,
1321                        sizeof(live_head_t), 1, rrd_file) != 1){
1322                 rrd_set_error("fwrite live_head to rrd");
1323                 free(updvals);
1324                 rrd_free(&rrd);
1325                 free(tmpl_idx);
1326                 free(pdp_temp);
1327                 free(pdp_new);
1328                 fclose(rrd_file);
1329                 return(-1);
1330             }
1331     }
1332     else {
1333             if(fwrite( &rrd.live_head->last_up,
1334                        sizeof(time_t), 1, rrd_file) != 1){
1335                 rrd_set_error("fwrite live_head to rrd");
1336                 free(updvals);
1337                 rrd_free(&rrd);
1338                 free(tmpl_idx);
1339                 free(pdp_temp);
1340                 free(pdp_new);
1341                 fclose(rrd_file);
1342                 return(-1);
1343             }
1344     }
1345             
1346
1347     if(fwrite( rrd.pdp_prep,
1348                sizeof(pdp_prep_t),
1349                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1350         rrd_set_error("ftwrite pdp_prep to rrd");
1351         free(updvals);
1352         rrd_free(&rrd);
1353         free(tmpl_idx);
1354         free(pdp_temp);
1355         free(pdp_new);
1356         fclose(rrd_file);
1357         return(-1);
1358     }
1359
1360     if(fwrite( rrd.cdp_prep,
1361                sizeof(cdp_prep_t),
1362                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1363        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1364
1365         rrd_set_error("ftwrite cdp_prep to rrd");
1366         free(updvals);
1367         free(tmpl_idx);
1368         rrd_free(&rrd);
1369         free(pdp_temp);
1370         free(pdp_new);
1371         fclose(rrd_file);
1372         return(-1);
1373     }
1374
1375     if(fwrite( rrd.rra_ptr,
1376                sizeof(rra_ptr_t), 
1377                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1378         rrd_set_error("fwrite rra_ptr to rrd");
1379         free(updvals);
1380         free(tmpl_idx);
1381         rrd_free(&rrd);
1382         free(pdp_temp);
1383         free(pdp_new);
1384         fclose(rrd_file);
1385         return(-1);
1386     }
1387
1388     /* OK now close the files and free the memory */
1389     if(fclose(rrd_file) != 0){
1390         rrd_set_error("closing rrd");
1391         free(updvals);
1392         free(tmpl_idx);
1393         rrd_free(&rrd);
1394         free(pdp_temp);
1395         free(pdp_new);
1396         return(-1);
1397     }
1398
1399     /* calling the smoothing code here guarantees at most
1400          * one smoothing operation per rrd_update call. Unfortunately,
1401          * it is possible with bulk updates, or a long-delayed update
1402          * for smoothing to occur off-schedule. This really isn't
1403          * critical except during the burning cycles. */
1404         if (schedule_smooth)
1405         {
1406           rrd_file = fopen(filename,"rb+");
1407           rra_start = rra_begin;
1408           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1409           {
1410             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1411                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1412             {
1413 #ifdef DEBUG
1414               fprintf(stderr,"Running smoother for rra %ld\n",i);
1415 #endif
1416               apply_smoother(&rrd,i,rra_start,rrd_file);
1417               if (rrd_test_error())
1418                 break;
1419             }
1420             rra_start += rrd.rra_def[i].row_cnt
1421               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1422           }
1423           fclose(rrd_file);
1424         }
1425     rrd_free(&rrd);
1426     free(updvals);
1427     free(tmpl_idx);
1428     free(pdp_new);
1429     free(pdp_temp);
1430     return(0);
1431 }
1432
1433 /*
1434  * get exclusive lock to whole file.
1435  * lock gets removed when we close the file
1436  *
1437  * returns 0 on success
1438  */
1439 int
1440 LockRRD(FILE *rrdfile)
1441 {
1442     int rrd_fd;         /* File descriptor for RRD */
1443     int rcstat;
1444
1445     rrd_fd = fileno(rrdfile);
1446
1447         {
1448 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1449     struct _stat st;
1450
1451     if ( _fstat( rrd_fd, &st ) == 0 ) {
1452             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1453     } else {
1454             rcstat = -1;
1455     }
1456 #else
1457     struct flock        lock;
1458     lock.l_type = F_WRLCK;    /* exclusive write lock */
1459     lock.l_len = 0;           /* whole file */
1460     lock.l_start = 0;         /* start of file */
1461     lock.l_whence = SEEK_SET;   /* end of file */
1462
1463     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1464 #endif
1465         }
1466
1467     return(rcstat);
1468 }
1469
1470
1471 #ifdef HAVE_MMAP
1472 info_t
1473 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1474                unsigned short CDP_scratch_idx, 
1475 #ifndef DEBUG
1476 FILE UNUSED(*rrd_file),
1477 #else
1478 FILE *rrd_file,
1479 #endif
1480                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1481 #else
1482 info_t
1483 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1484                unsigned short CDP_scratch_idx, FILE *rrd_file,
1485                    info_t *pcdp_summary, time_t *rra_time)
1486 #endif
1487 {
1488    unsigned long ds_idx, cdp_idx;
1489    infoval iv;
1490   
1491    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1492    {
1493       /* compute the cdp index */
1494       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1495 #ifdef DEBUG
1496           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1497              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1498              rrd -> rra_def[rra_idx].cf_nam);
1499 #endif 
1500       if (pcdp_summary != NULL)
1501           {
1502              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1503              /* append info to the return hash */
1504                  pcdp_summary = info_push(pcdp_summary,
1505                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1506                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1507                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1508          RD_I_VAL, iv);
1509           }
1510 #ifdef HAVE_MMAP
1511           memcpy((char *)rrd_mmaped_file + *rra_current,
1512                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1513                           sizeof(rrd_value_t));
1514 #else
1515           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1516                  sizeof(rrd_value_t),1,rrd_file) != 1)
1517           { 
1518              rrd_set_error("writing rrd");
1519              return 0;
1520           }
1521 #endif
1522           *rra_current += sizeof(rrd_value_t);
1523         }
1524         return (pcdp_summary);
1525 }