make rrdupdate realy light as it was intended in the first place -- Peter Breitenlohn...
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.15  Copyright by Tobi Oetiker, 1997-2006
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13  #include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37         time_t tv_sec; /* seconds */
38         long tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43         int  tz_minuteswest; /* minutes W of Greenwich */
44         int  tz_dsttime;     /* type of dst correction */
45 };
46
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
48
49         struct _timeb current_time;
50
51         _ftime(&current_time);
52
53         t->tv_sec  = current_time.time;
54         t->tv_usec = current_time.millitm * 1000;
55
56         return 0;
57 }
58
59 #endif
60 /*
61  * normilize time as returned by gettimeofday. usec part must
62  * be always >= 0
63  */
64 static void normalize_time(struct timeval *t)
65 {
66         if(t->tv_usec < 0) {
67                 t->tv_sec--;
68                 t->tv_usec += 1000000L;
69         }
70 }
71
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
76                                         unsigned long *rra_current,
77                                         unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
86                                         unsigned long *rra_current,
87                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
88                                         info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(char *filename, char *tmplt, int argc, char **argv);
91 int _rrd_update(char *filename, char *tmplt, int argc, char **argv, 
92                                         info_t*);
93
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
95
96
97 info_t *rrd_update_v(int argc, char **argv)
98 {
99     char             *tmplt = NULL;          
100         info_t *result = NULL;
101         infoval rc;
102       rc.u_int = -1;
103     optind = 0; opterr = 0;  /* initialize getopt */
104
105     while (1) {
106                 static struct option long_options[] =
107                         {
108                                 {"template",      required_argument, 0, 't'},
109                                 {0,0,0,0}
110                         };
111                 int option_index = 0;
112                 int opt;
113                 opt = getopt_long(argc, argv, "t:", 
114                                                   long_options, &option_index);
115                 
116                 if (opt == EOF)
117                         break;
118                 
119                 switch(opt) {
120                 case 't':
121                         tmplt = optarg;
122                         break;
123                 
124                 case '?':
125                         rrd_set_error("unknown option '%s'",argv[optind-1]);
126                         goto end_tag;
127                 }
128     }
129
130     /* need at least 2 arguments: filename, data. */
131     if (argc-optind < 2) {
132                 rrd_set_error("Not enough arguments");
133                 goto end_tag;
134     }
135     rc.u_int = 0;
136     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
137         rc.u_int = _rrd_update(argv[optind], tmplt,
138                       argc - optind - 1, argv + optind + 1, result);
139     result->value.u_int = rc.u_int;
140 end_tag:
141     return result;
142 }
143
144 int
145 rrd_update(int argc, char **argv)
146 {
147     char             *tmplt = NULL;          
148     int rc;
149     optind = 0; opterr = 0;  /* initialize getopt */
150
151     while (1) {
152                 static struct option long_options[] =
153                         {
154                                 {"template",      required_argument, 0, 't'},
155                                 {0,0,0,0}
156                         };
157                 int option_index = 0;
158                 int opt;
159                 opt = getopt_long(argc, argv, "t:", 
160                                                   long_options, &option_index);
161                 
162                 if (opt == EOF)
163                         break;
164                 
165                 switch(opt) {
166                 case 't':
167                         tmplt = optarg;
168                         break;
169                 
170                 case '?':
171                         rrd_set_error("unknown option '%s'",argv[optind-1]);
172                         return(-1);
173                 }
174     }
175
176     /* need at least 2 arguments: filename, data. */
177     if (argc-optind < 2) {
178                 rrd_set_error("Not enough arguments");
179
180                 return -1;
181     }
182  
183         rc = rrd_update_r(argv[optind], tmplt,
184                       argc - optind - 1, argv + optind + 1);
185     return rc;
186 }
187
188 int
189 rrd_update_r(char *filename, char *tmplt, int argc, char **argv)
190 {
191    return _rrd_update(filename, tmplt, argc, argv, NULL);
192 }
193
194 int
195 _rrd_update(char *filename, char *tmplt, int argc, char **argv, 
196    info_t *pcdp_summary)
197 {
198
199     int              arg_i = 2;
200     short            j;
201     unsigned long    i,ii,iii=1;
202
203     unsigned long    rra_begin;          /* byte pointer to the rra
204                                           * area in the rrd file.  this
205                                           * pointer never changes value */
206     unsigned long    rra_start;          /* byte pointer to the rra
207                                           * area in the rrd file.  this
208                                           * pointer changes as each rrd is
209                                           * processed. */
210     unsigned long    rra_current;        /* byte pointer to the current write
211                                           * spot in the rrd file. */
212     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
213     double           interval,
214                      pre_int,post_int;   /* interval between this and
215                                           * the last run */
216     unsigned long    proc_pdp_st;        /* which pdp_st was the last
217                                           * to be processed */
218     unsigned long    occu_pdp_st;        /* when was the pdp_st
219                                           * before the last update
220                                           * time */
221     unsigned long    proc_pdp_age;       /* how old was the data in
222                                           * the pdp prep area when it
223                                           * was last updated */
224     unsigned long    occu_pdp_age;       /* how long ago was the last
225                                           * pdp_step time */
226     rrd_value_t      *pdp_new;           /* prepare the incoming data
227                                           * to be added the the
228                                           * existing entry */
229     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
230                                           * to be added the the
231                                           * cdp values */
232
233     long             *tmpl_idx;          /* index representing the settings
234                                             transported by the tmplt index */
235     unsigned long    tmpl_cnt = 2;       /* time and data */
236
237     FILE             *rrd_file;
238     rrd_t            rrd;
239     time_t           current_time = 0;
240     time_t           rra_time = 0;       /* time of update for a RRA */
241     unsigned long    current_time_usec=0;/* microseconds part of current time */
242     struct timeval   tmp_time;           /* used for time conversion */
243
244     char             **updvals;
245     int              schedule_smooth = 0;
246         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
247                                          /* a vector of future Holt-Winters seasonal coefs */
248     unsigned long    elapsed_pdp_st;
249                                          /* number of elapsed PDP steps since last update */
250     unsigned long    *rra_step_cnt = NULL;
251                                          /* number of rows to be updated in an RRA for a data
252                                           * value. */
253     unsigned long    start_pdp_offset;
254                                          /* number of PDP steps since the last update that
255                                           * are assigned to the first CDP to be generated
256                                           * since the last update. */
257     unsigned short   scratch_idx;
258                                          /* index into the CDP scratch array */
259     enum cf_en       current_cf;
260                                          /* numeric id of the current consolidation function */
261     rpnstack_t       rpnstack; /* used for COMPUTE DS */
262     int              version;  /* rrd version */
263     char             *endptr; /* used in the conversion */
264 #ifdef HAVE_MMAP
265     void             *rrd_mmaped_file;
266     unsigned long    rrd_filesize;
267 #endif
268
269     rpnstack_init(&rpnstack);
270
271     /* need at least 1 arguments: data. */
272     if (argc < 1) {
273         rrd_set_error("Not enough arguments");
274         return -1;
275     }
276     
277     
278
279     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
280         return -1;
281     }
282     /* initialize time */
283     version = atoi(rrd.stat_head->version);
284     gettimeofday(&tmp_time, 0);
285     normalize_time(&tmp_time);
286     current_time = tmp_time.tv_sec;
287     if(version >= 3) {
288         current_time_usec = tmp_time.tv_usec;
289     }
290     else {
291         current_time_usec = 0;
292     }
293
294     rra_current = rra_start = rra_begin = ftell(rrd_file);
295     /* This is defined in the ANSI C standard, section 7.9.5.3:
296
297         When a file is opened with udpate mode ('+' as the second
298         or third character in the ... list of mode argument
299         variables), both input and ouptut may be performed on the
300         associated stream.  However, ...  input may not be directly
301         followed by output without an intervening call to a file
302         positioning function, unless the input oepration encounters
303         end-of-file. */
304 #ifdef HAVE_MMAP
305     fseek(rrd_file, 0, SEEK_END);
306     rrd_filesize = ftell(rrd_file);
307     fseek(rrd_file, rra_current, SEEK_SET);
308 #else
309     fseek(rrd_file, 0, SEEK_CUR);
310 #endif
311
312     
313     /* get exclusive lock to whole file.
314      * lock gets removed when we close the file.
315      */
316     if (LockRRD(rrd_file) != 0) {
317       rrd_set_error("could not lock RRD");
318       rrd_free(&rrd);
319       fclose(rrd_file);
320       return(-1);   
321     } 
322
323     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
324         rrd_set_error("allocating updvals pointer array");
325         rrd_free(&rrd);
326         fclose(rrd_file);
327         return(-1);
328     }
329
330     if ((pdp_temp = malloc(sizeof(rrd_value_t)
331                            *rrd.stat_head->ds_cnt))==NULL){
332         rrd_set_error("allocating pdp_temp ...");
333         free(updvals);
334         rrd_free(&rrd);
335         fclose(rrd_file);
336         return(-1);
337     }
338
339     if ((tmpl_idx = malloc(sizeof(unsigned long)
340                            *(rrd.stat_head->ds_cnt+1)))==NULL){
341         rrd_set_error("allocating tmpl_idx ...");
342         free(pdp_temp);
343         free(updvals);
344         rrd_free(&rrd);
345         fclose(rrd_file);
346         return(-1);
347     }
348     /* initialize tmplt redirector */
349     /* default config example (assume DS 1 is a CDEF DS)
350        tmpl_idx[0] -> 0; (time)
351        tmpl_idx[1] -> 1; (DS 0)
352        tmpl_idx[2] -> 3; (DS 2)
353        tmpl_idx[3] -> 4; (DS 3) */
354     tmpl_idx[0] = 0; /* time */
355     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
356         {
357            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
358               tmpl_idx[ii++]=i;
359         }
360     tmpl_cnt= ii;
361
362     if (tmplt) {
363         /* we should work on a writeable copy here */
364         char *dsname;
365         unsigned int tmpl_len;
366         tmplt = strdup(tmplt);
367         dsname = tmplt;
368         tmpl_cnt = 1; /* the first entry is the time */
369         tmpl_len = strlen(tmplt);
370         for(i=0;i<=tmpl_len ;i++) {
371             if (tmplt[i] == ':' || tmplt[i] == '\0') {
372                 tmplt[i] = '\0';
373                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
374                     rrd_set_error("tmplt contains more DS definitions than RRD");
375                     free(updvals); free(pdp_temp);
376                     free(tmpl_idx); rrd_free(&rrd);
377                     fclose(rrd_file); return(-1);
378                 }
379                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
380                     rrd_set_error("unknown DS name '%s'",dsname);
381                     free(updvals); free(pdp_temp);
382                     free(tmplt);
383                     free(tmpl_idx); rrd_free(&rrd);
384                     fclose(rrd_file); return(-1);
385                 } else {
386                   /* the first element is always the time */
387                   tmpl_idx[tmpl_cnt-1]++; 
388                   /* go to the next entry on the tmplt */
389                   dsname = &tmplt[i+1];
390                   /* fix the damage we did before */
391                   if (i<tmpl_len) {
392                      tmplt[i]=':';
393                   } 
394
395                 }
396             }       
397         }
398         free(tmplt);
399     }
400     if ((pdp_new = malloc(sizeof(rrd_value_t)
401                           *rrd.stat_head->ds_cnt))==NULL){
402         rrd_set_error("allocating pdp_new ...");
403         free(updvals);
404         free(pdp_temp);
405         free(tmpl_idx);
406         rrd_free(&rrd);
407         fclose(rrd_file);
408         return(-1);
409     }
410
411 #ifdef HAVE_MMAP
412     rrd_mmaped_file = mmap(0, 
413                         rrd_filesize, 
414                         PROT_READ | PROT_WRITE, 
415                         MAP_SHARED, 
416                         fileno(rrd_file), 
417                         0);
418     if (rrd_mmaped_file == MAP_FAILED) {
419         rrd_set_error("error mmapping file %s", filename);
420         free(updvals);
421         free(pdp_temp);
422         free(tmpl_idx);
423         rrd_free(&rrd);
424         fclose(rrd_file);
425         return(-1);
426     }
427 #endif
428     /* loop through the arguments. */
429     for(arg_i=0; arg_i<argc;arg_i++) {
430         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
431         char *step_start = stepper;
432         char *p;
433         char *parsetime_error = NULL;
434         enum {atstyle, normal} timesyntax;
435         struct rrd_time_value ds_tv;
436         if (stepper == NULL){
437                 rrd_set_error("failed duplication argv entry");
438                 free(updvals);
439                 free(pdp_temp);  
440                 free(tmpl_idx);
441                 rrd_free(&rrd);
442 #ifdef HAVE_MMAP
443                 munmap(rrd_mmaped_file, rrd_filesize);
444 #endif
445                 fclose(rrd_file);
446                 return(-1);
447          }
448         /* initialize all ds input to unknown except the first one
449            which has always got to be set */
450         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
451         strcpy(stepper,argv[arg_i]);
452         updvals[0]=stepper;
453         /* separate all ds elements; first must be examined separately
454            due to alternate time syntax */
455         if ((p=strchr(stepper,'@'))!=NULL) {
456             timesyntax = atstyle;
457             *p = '\0';
458             stepper = p+1;
459         } else if ((p=strchr(stepper,':'))!=NULL) {
460             timesyntax = normal;
461             *p = '\0';
462             stepper = p+1;
463         } else {
464             rrd_set_error("expected timestamp not found in data source from %s:...",
465                           argv[arg_i]);
466             free(step_start);
467             break;
468         }
469         ii=1;
470         updvals[tmpl_idx[ii]] = stepper;
471         while (*stepper) {
472             if (*stepper == ':') {
473                 *stepper = '\0';
474                 ii++;
475                 if (ii<tmpl_cnt){                   
476                     updvals[tmpl_idx[ii]] = stepper+1;
477                 }
478             }
479             stepper++;
480         }
481
482         if (ii != tmpl_cnt-1) {
483             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
484                           tmpl_cnt-1, ii, argv[arg_i]);
485             free(step_start);
486             break;
487         }
488         
489         /* get the time from the reading ... handle N */
490         if (timesyntax == atstyle) {
491             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
492                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
493                 free(step_start);
494                 break;
495             }
496             if (ds_tv.type == RELATIVE_TO_END_TIME ||
497                 ds_tv.type == RELATIVE_TO_START_TIME) {
498                 rrd_set_error("specifying time relative to the 'start' "
499                               "or 'end' makes no sense here: %s",
500                               updvals[0]);
501                 free(step_start);
502                 break;
503             }
504
505             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
506             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
507             
508         } else if (strcmp(updvals[0],"N")==0){
509             gettimeofday(&tmp_time, 0);
510             normalize_time(&tmp_time);
511             current_time = tmp_time.tv_sec;
512             current_time_usec = tmp_time.tv_usec;
513         } else {
514             double tmp;
515             tmp = strtod(updvals[0], 0);
516             current_time = floor(tmp);
517             current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
518         }
519         /* dont do any correction for old version RRDs */
520         if(version < 3) 
521             current_time_usec = 0;
522         
523         if(current_time < rrd.live_head->last_up || 
524           (current_time == rrd.live_head->last_up && 
525            (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
526             rrd_set_error("illegal attempt to update using time %ld when "
527                           "last update time is %ld (minimum one second step)",
528                           current_time, rrd.live_head->last_up);
529             free(step_start);
530             break;
531         }
532         
533         
534         /* seek to the beginning of the rra's */
535         if (rra_current != rra_begin) {
536 #ifndef HAVE_MMAP
537             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
538                 rrd_set_error("seek error in rrd");
539                 free(step_start);
540                 break;
541             }
542 #endif
543             rra_current = rra_begin;
544         }
545         rra_start = rra_begin;
546
547         /* when was the current pdp started */
548         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
549         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
550
551         /* when did the last pdp_st occur */
552         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
553         occu_pdp_st = current_time - occu_pdp_age;
554
555         /* interval = current_time - rrd.live_head->last_up; */
556         interval    = (double)(current_time - rrd.live_head->last_up) 
557                     + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
558
559         if (occu_pdp_st > proc_pdp_st){
560             /* OK we passed the pdp_st moment*/
561             pre_int =  (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
562                                                               * occurred before the latest
563                                                               * pdp_st moment*/
564             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
565             post_int = occu_pdp_age;                         /* how much after it */
566             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
567         } else {
568             pre_int = interval;
569             post_int = 0;
570         }
571
572 #ifdef DEBUG
573         printf(
574                "proc_pdp_age %lu\t"
575                "proc_pdp_st %lu\t" 
576                "occu_pfp_age %lu\t" 
577                "occu_pdp_st %lu\t"
578                "int %lf\t"
579                "pre_int %lf\t"
580                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
581                 occu_pdp_age, occu_pdp_st,
582                interval, pre_int, post_int);
583 #endif
584     
585         /* process the data sources and update the pdp_prep 
586          * area accordingly */
587         for(i=0;i<rrd.stat_head->ds_cnt;i++){
588             enum dst_en dst_idx;
589             dst_idx= dst_conv(rrd.ds_def[i].dst);
590
591             /* make sure we do not build diffs with old last_ds values */
592             if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval 
593                 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
594                 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
595             }
596
597             /* NOTE: DST_CDEF should never enter this if block, because
598              * updvals[i+1][0] is initialized to 'U'; unless the caller
599              * accidently specified a value for the DST_CDEF. To handle 
600               * this case, an extra check is required. */
601
602             if((updvals[i+1][0] != 'U') &&
603                    (dst_idx != DST_CDEF) &&
604                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
605                double rate = DNAN;
606                /* the data source type defines how to process the data */
607                 /* pdp_new contains rate * time ... eg the bytes
608                  * transferred during the interval. Doing it this way saves
609                  * a lot of math operations */
610                 
611
612                 switch(dst_idx){
613                 case DST_COUNTER:
614                 case DST_DERIVE:
615                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
616                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
617                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
618                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
619                                  break;
620                             }
621                        }
622                        if (rrd_test_error()){
623                             break;
624                        }
625                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
626                        if(dst_idx == DST_COUNTER) {
627                           /* simple overflow catcher suggested by Andres Kroonmaa */
628                           /* this will fail terribly for non 32 or 64 bit counters ... */
629                           /* are there any others in SNMP land ? */
630                           if (pdp_new[i] < (double)0.0 ) 
631                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
632                           if (pdp_new[i] < (double)0.0 ) 
633                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
634                        }
635                        rate = pdp_new[i] / interval;
636                     }
637                    else {
638                      pdp_new[i]= DNAN;          
639                    }
640                    break;
641                 case DST_ABSOLUTE:
642                     errno = 0;
643                     pdp_new[i] = strtod(updvals[i+1],&endptr);
644                     if (errno > 0){
645                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
646                         break;
647                     };
648                     if (endptr[0] != '\0'){
649                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
650                         break;
651                     }
652                     rate = pdp_new[i] / interval;                 
653                     break;
654                 case DST_GAUGE:
655                     errno = 0;
656                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
657                     if (errno > 0){
658                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
659                         break;
660                     };
661                     if (endptr[0] != '\0'){
662                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
663                         break;
664                     }
665                     rate = pdp_new[i] / interval;                  
666                     break;
667                 default:
668                     rrd_set_error("rrd contains unknown DS type : '%s'",
669                                   rrd.ds_def[i].dst);
670                     break;
671                 }
672                 /* break out of this for loop if the error string is set */
673                 if (rrd_test_error()){
674                     break;
675                 }
676                /* make sure pdp_temp is neither too large or too small
677                 * if any of these occur it becomes unknown ...
678                 * sorry folks ... */
679                if ( ! isnan(rate) && 
680                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
681                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
682                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
683                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
684                   pdp_new[i] = DNAN;
685                }               
686             } else {
687                 /* no news is news all the same */
688                 pdp_new[i] = DNAN;
689             }
690
691             
692             /* make a copy of the command line argument for the next run */
693 #ifdef DEBUG
694             fprintf(stderr,
695                     "prep ds[%lu]\t"
696                     "last_arg '%s'\t"
697                     "this_arg '%s'\t"
698                     "pdp_new %10.2f\n",
699                     i,
700                     rrd.pdp_prep[i].last_ds,
701                     updvals[i+1], pdp_new[i]);
702 #endif
703             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
704                 strncpy(rrd.pdp_prep[i].last_ds,
705                         updvals[i+1],LAST_DS_LEN-1);
706                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
707             }
708         }
709         /* break out of the argument parsing loop if the error_string is set */
710         if (rrd_test_error()){
711             free(step_start);
712             break;
713         }
714         /* has a pdp_st moment occurred since the last run ? */
715
716         if (proc_pdp_st == occu_pdp_st){
717             /* no we have not passed a pdp_st moment. therefore update is simple */
718
719             for(i=0;i<rrd.stat_head->ds_cnt;i++){
720                 if(isnan(pdp_new[i])) {            
721                     /* this is not realy accurate if we use subsecond data arival time
722                        should have thought of it when going subsecond resolution ...
723                        sorry next format change we will have it! */
724                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);          
725                 } else {
726                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
727                         rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
728                      } else {
729                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
730                      }
731                 }
732 #ifdef DEBUG
733                 fprintf(stderr,
734                         "NO PDP  ds[%lu]\t"
735                         "value %10.2f\t"
736                         "unkn_sec %5lu\n",
737                         i,
738                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
739                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
740 #endif
741             }   
742         } else {
743             /* an pdp_st has occurred. */
744
745             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
746              * occurred up to the last run.        
747             pdp_new[] contains rate*seconds from the latest run.
748             pdp_temp[] will contain the rate for cdp */
749
750             for(i=0;i<rrd.stat_head->ds_cnt;i++){
751                 /* update pdp_prep to the current pdp_st. */
752                 double pre_unknown = 0.0;               
753                 if(isnan(pdp_new[i]))
754                     /* a final bit of unkonwn to be added bevore calculation
755                      * we use a tempaorary variable for this so that we 
756                      * don't have to turn integer lines before using the value */                
757                     pre_unknown = pre_int;
758                 else {
759                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
760                         rrd.pdp_prep[i].scratch[PDP_val].u_val=         pdp_new[i]/interval*pre_int;
761                      } else {
762                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
763                      }
764                  }
765                 
766
767                 /* if too much of the pdp_prep is unknown we dump it */
768                 if ( 
769                     /* removed because this does not agree with the definition
770                        a heart beat can be unknown */
771                     /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
772                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
773                     /* if the interval is larger thatn mrhb we get NAN */
774                     (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
775                     (occu_pdp_st-proc_pdp_st <= 
776                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
777                     pdp_temp[i] = DNAN;
778                 } else {
779                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
780                         / ((double)(occu_pdp_st - proc_pdp_st
781                                     - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
782                             -pre_unknown);
783                 }
784
785                 /* process CDEF data sources; remember each CDEF DS can
786                  * only reference other DS with a lower index number */
787             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
788                    rpnp_t *rpnp;
789                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
790                    /* substitue data values for OP_VARIABLE nodes */
791                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
792                    {
793                           if (rpnp[ii].op == OP_VARIABLE) {
794                                  rpnp[ii].op = OP_NUMBER;
795                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
796                           }
797                    }
798                    /* run the rpn calculator */
799                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
800                           free(rpnp);
801                           break; /* exits the data sources pdp_temp loop */
802                    }
803                 }
804         
805                 /* make pdp_prep ready for the next run */
806                 if(isnan(pdp_new[i])){
807                     /* this is not realy accurate if we use subsecond data arival time
808                        should have thought of it when going subsecond resolution ...
809                        sorry next format change we will have it! */
810                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
811                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
812                 } else {
813                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
814                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
815                         pdp_new[i]/interval*post_int;
816                 }
817
818 #ifdef DEBUG
819                 fprintf(stderr,
820                         "PDP UPD ds[%lu]\t"
821                         "pdp_temp %10.2f\t"
822                         "new_prep %10.2f\t"
823                         "new_unkn_sec %5lu\n",
824                         i, pdp_temp[i],
825                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
826                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
827 #endif
828             }
829
830                 /* if there were errors during the last loop, bail out here */
831             if (rrd_test_error()){
832                free(step_start);
833                break;
834             }
835
836                 /* compute the number of elapsed pdp_st moments */
837                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
838 #ifdef DEBUG
839                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
840 #endif
841                 if (rra_step_cnt == NULL)
842                 {
843                    rra_step_cnt = (unsigned long *) 
844                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
845                 }
846
847             for(i = 0, rra_start = rra_begin;
848                 i < rrd.stat_head->rra_cnt;
849             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
850                 i++)
851                 {
852                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
853                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
854                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
855         if (start_pdp_offset <= elapsed_pdp_st) {
856            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
857                       rrd.rra_def[i].pdp_cnt + 1;
858             } else {
859                    rra_step_cnt[i] = 0;
860                 }
861
862                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
863                 {
864                    /* If this is a bulk update, we need to skip ahead in the seasonal
865                         * arrays so that they will be correct for the next observed value;
866                         * note that for the bulk update itself, no update will occur to
867                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
868                         * be set to DNAN. */
869            if (rra_step_cnt[i] > 2) 
870                    {
871                           /* skip update by resetting rra_step_cnt[i],
872                            * note that this is not data source specific; this is due
873                            * to the bulk update, not a DNAN value for the specific data
874                            * source. */
875                           rra_step_cnt[i] = 0;
876               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
877                              &last_seasonal_coef);
878                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
879                              &seasonal_coef);
880                    }
881                 
882                   /* periodically run a smoother for seasonal effects */
883                   /* Need to use first cdp parameter buffer to track
884                    * burnin (burnin requires a specific smoothing schedule).
885                    * The CDP_init_seasonal parameter is really an RRA level,
886                    * not a data source within RRA level parameter, but the rra_def
887                    * is read only for rrd_update (not flushed to disk). */
888                   iii = i*(rrd.stat_head -> ds_cnt);
889                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
890                           <= BURNIN_CYCLES)
891                   {
892                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
893                                  > rrd.rra_def[i].row_cnt - 1) {
894                            /* mark off one of the burnin cycles */
895                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
896                        schedule_smooth = 1;
897                          }  
898                   } else {
899                          /* someone has no doubt invented a trick to deal with this
900                           * wrap around, but at least this code is clear. */
901                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
902                              rrd.rra_ptr[i].cur_row)
903                          {
904                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
905                                   * mapping between PDP and CDP */
906                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
907                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
908                                  {
909 #ifdef DEBUG
910                                         fprintf(stderr,
911                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
912                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
913                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
914 #endif
915                                         schedule_smooth = 1;
916                                  }
917              } else {
918                                  /* can't rely on negative numbers because we are working with
919                                   * unsigned values */
920                                  /* Don't need modulus here. If we've wrapped more than once, only
921                                   * one smooth is executed at the end. */
922                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
923                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
924                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
925                                  {
926 #ifdef DEBUG
927                                         fprintf(stderr,
928                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
929                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
930                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
931 #endif
932                                         schedule_smooth = 1;
933                                  }
934                          }
935                   }
936
937               rra_current = ftell(rrd_file); 
938                 } /* if cf is DEVSEASONAL or SEASONAL */
939
940         if (rrd_test_error()) break;
941
942                     /* update CDP_PREP areas */
943                     /* loop over data soures within each RRA */
944                     for(ii = 0;
945                         ii < rrd.stat_head->ds_cnt;
946                         ii++)
947                         {
948                         
949                         /* iii indexes the CDP prep area for this data source within the RRA */
950                         iii=i*rrd.stat_head->ds_cnt+ii;
951
952                         if (rrd.rra_def[i].pdp_cnt > 1) {
953                           
954                            if (rra_step_cnt[i] > 0) {
955                            /* If we are in this block, as least 1 CDP value will be written to
956                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
957                                 * to be written, then the "fill in" value is the CDP_secondary_val
958                                 * entry. */
959                                   if (isnan(pdp_temp[ii]))
960                   {
961                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
962                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
963                                   } else {
964                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
965                                           * CDP data entries. No matter the CF, the value is the same because
966                                           * the average, max, min, and last of a list of identical values is
967                                           * the same, namely, the value itself. */
968                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
969                                   }
970                      
971                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
972                                       > rrd.rra_def[i].pdp_cnt*
973                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
974                                   {
975                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
976                                          /* initialize carry over */
977                                          if (current_cf == CF_AVERAGE) {
978                                                    if (isnan(pdp_temp[ii])) { 
979                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
980                                                    } else {
981                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
982                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
983                                                    }
984                                          } else {
985                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
986                                          }
987                                   } else {
988                                          rrd_value_t cum_val, cur_val; 
989                                      switch (current_cf) {
990                                                 case CF_AVERAGE:
991                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
992                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
993                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
994                                                (cum_val + cur_val * start_pdp_offset) /
995                                            (rrd.rra_def[i].pdp_cnt
996                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
997                                                    /* initialize carry over value */
998                                                    if (isnan(pdp_temp[ii])) { 
999                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1000                                                    } else {
1001                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1002                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1003                                                    }
1004                                                    break;
1005                                                 case CF_MAXIMUM:
1006                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1007                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1008 #ifdef DEBUG
1009                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1010                                                           isnan(pdp_temp[ii])) {
1011                                                      fprintf(stderr,
1012                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1013                                                                 i,ii);
1014                                                          exit(-1);
1015                                                   }
1016 #endif
1017                                                   if (cur_val > cum_val)
1018                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1019                                                   else
1020                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1021                                                   /* initialize carry over value */
1022                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1023                                                   break;
1024                                                 case CF_MINIMUM:
1025                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1026                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1027 #ifdef DEBUG
1028                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1029                                                           isnan(pdp_temp[ii])) {
1030                                                      fprintf(stderr,
1031                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1032                                                                 i,ii);
1033                                                          exit(-1);
1034                                                   }
1035 #endif
1036                                                   if (cur_val < cum_val)
1037                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1038                                                   else
1039                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1040                                                   /* initialize carry over value */
1041                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1042                                                   break;
1043                                                 case CF_LAST:
1044                                                 default:
1045                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1046                                                    /* initialize carry over value */
1047                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1048                                                 break;
1049                                          }
1050                                   } /* endif meets xff value requirement for a valid value */
1051                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1052                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1053                                   if (isnan(pdp_temp[ii]))
1054                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1055                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1056                                   else
1057                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1058                } else  /* rra_step_cnt[i]  == 0 */
1059                            {
1060 #ifdef DEBUG
1061                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1062                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1063                                          i,ii);
1064                                   } else {
1065                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1066                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1067                                   }
1068 #endif
1069                                   if (isnan(pdp_temp[ii])) {
1070                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1071                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1072                                   {
1073                                          if (current_cf == CF_AVERAGE) {
1074                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1075                                                    elapsed_pdp_st;
1076                                          } else {
1077                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1078                                          }
1079 #ifdef DEBUG
1080                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1081                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1082 #endif
1083                                   } else {
1084                                          switch (current_cf) {
1085                                          case CF_AVERAGE:
1086                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1087                                                    elapsed_pdp_st;
1088                                                 break;
1089                                          case CF_MINIMUM:
1090                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1091                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1092                                                 break; 
1093                                          case CF_MAXIMUM:
1094                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1095                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1096                                                 break; 
1097                                          case CF_LAST:
1098                                          default:
1099                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1100                                                 break;
1101                                          }
1102                                   }
1103                            }
1104                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1105                            if (elapsed_pdp_st > 2)
1106                            {
1107                                    switch (current_cf) {
1108                                    case CF_AVERAGE:
1109                                    default:
1110                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1111                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1112                                           break;
1113                    case CF_SEASONAL:
1114                                    case CF_DEVSEASONAL:
1115                                           /* need to update cached seasonal values, so they are consistent
1116                                            * with the bulk update */
1117                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1118                                            * CDP_last_deviation are the same. */
1119                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1120                                                  last_seasonal_coef[ii];
1121                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1122                                                  seasonal_coef[ii];
1123                                           break;
1124                    case CF_HWPREDICT:
1125                                           /* need to update the null_count and last_null_count.
1126                                            * even do this for non-DNAN pdp_temp because the
1127                                            * algorithm is not learning from batch updates. */
1128                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1129                                                  elapsed_pdp_st;
1130                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1131                                                  elapsed_pdp_st - 1;
1132                                           /* fall through */
1133                                    case CF_DEVPREDICT:
1134                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1135                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1136                                           break;
1137                    case CF_FAILURES:
1138                                           /* do not count missed bulk values as failures */
1139                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1140                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1141                                           /* need to reset violations buffer.
1142                                            * could do this more carefully, but for now, just
1143                                            * assume a bulk update wipes away all violations. */
1144                       erase_violations(&rrd, iii, i);
1145                                           break;
1146                                    }
1147                            } 
1148                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1149
1150                         if (rrd_test_error()) break;
1151
1152                         } /* endif data sources loop */
1153         } /* end RRA Loop */
1154
1155                 /* this loop is only entered if elapsed_pdp_st < 3 */
1156                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1157                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1158                 {
1159                for(i = 0, rra_start = rra_begin;
1160                    i < rrd.stat_head->rra_cnt;
1161                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1162                    i++)
1163                    {
1164                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1165
1166                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1167                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1168                           {
1169                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1170                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1171                                 &seasonal_coef);
1172                  rra_current = ftell(rrd_file);
1173                           }
1174                           if (rrd_test_error()) break;
1175                       /* loop over data soures within each RRA */
1176                       for(ii = 0;
1177                           ii < rrd.stat_head->ds_cnt;
1178                           ii++)
1179                           {
1180                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1181                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1182                                     scratch_idx, seasonal_coef);
1183                           }
1184            } /* end RRA Loop */
1185                    if (rrd_test_error()) break;
1186             } /* end elapsed_pdp_st loop */
1187
1188                 if (rrd_test_error()) break;
1189
1190                 /* Ready to write to disk */
1191                 /* Move sequentially through the file, writing one RRA at a time.
1192                  * Note this architecture divorces the computation of CDP with
1193                  * flushing updated RRA entries to disk. */
1194             for(i = 0, rra_start = rra_begin;
1195                 i < rrd.stat_head->rra_cnt;
1196             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1197                 i++) {
1198                 /* is there anything to write for this RRA? If not, continue. */
1199         if (rra_step_cnt[i] == 0) continue;
1200
1201                 /* write the first row */
1202 #ifdef DEBUG
1203         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1204 #endif
1205             rrd.rra_ptr[i].cur_row++;
1206             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1207                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1208                 /* positition on the first row */
1209                 rra_pos_tmp = rra_start +
1210                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1211                 if(rra_pos_tmp != rra_current) {
1212 #ifndef HAVE_MMAP
1213                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1214                       rrd_set_error("seek error in rrd");
1215                       break;
1216                    }
1217 #endif
1218                    rra_current = rra_pos_tmp;
1219                 }
1220
1221 #ifdef DEBUG
1222             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1223 #endif
1224                 scratch_idx = CDP_primary_val;
1225                 if (pcdp_summary != NULL)
1226                 {
1227                    rra_time = (current_time - current_time 
1228                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1229                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1230                 }
1231 #ifdef HAVE_MMAP
1232                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1233                    pcdp_summary, &rra_time, rrd_mmaped_file);
1234 #else
1235                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1236                    pcdp_summary, &rra_time);
1237 #endif
1238                 if (rrd_test_error()) break;
1239
1240                 /* write other rows of the bulk update, if any */
1241                 scratch_idx = CDP_secondary_val;
1242                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1243                 {
1244                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1245                    {
1246 #ifdef DEBUG
1247               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1248                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1249 #endif
1250                           /* wrap */
1251                           rrd.rra_ptr[i].cur_row = 0;
1252                           /* seek back to beginning of current rra */
1253                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1254                           {
1255                          rrd_set_error("seek error in rrd");
1256                          break;
1257                           }
1258 #ifdef DEBUG
1259                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1260 #endif
1261                           rra_current = rra_start;
1262                    }
1263                    if (pcdp_summary != NULL)
1264                    {
1265                       rra_time = (current_time - current_time 
1266                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1267                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1268                    }
1269 #ifdef HAVE_MMAP
1270                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1271                       pcdp_summary, &rra_time, rrd_mmaped_file);
1272 #else
1273                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1274                       pcdp_summary, &rra_time);
1275 #endif
1276                 }
1277                 
1278                 if (rrd_test_error())
1279                   break;
1280                 } /* RRA LOOP */
1281
1282             /* break out of the argument parsing loop if error_string is set */
1283             if (rrd_test_error()){
1284                    free(step_start);
1285                    break;
1286             } 
1287             
1288         } /* endif a pdp_st has occurred */ 
1289         rrd.live_head->last_up = current_time;
1290         rrd.live_head->last_up_usec = current_time_usec; 
1291         free(step_start);
1292     } /* function argument loop */
1293
1294     if (seasonal_coef != NULL) free(seasonal_coef);
1295     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1296         if (rra_step_cnt != NULL) free(rra_step_cnt);
1297     rpnstack_free(&rpnstack);
1298
1299 #ifdef HAVE_MMAP
1300     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1301             rrd_set_error("error writing(unmapping) file: %s", filename);
1302     }
1303 #endif    
1304     /* if we got here and if there is an error and if the file has not been
1305      * written to, then close things up and return. */
1306     if (rrd_test_error()) {
1307         free(updvals);
1308         free(tmpl_idx);
1309         rrd_free(&rrd);
1310         free(pdp_temp);
1311         free(pdp_new);
1312         fclose(rrd_file);
1313         return(-1);
1314     }
1315
1316     /* aargh ... that was tough ... so many loops ... anyway, its done.
1317      * we just need to write back the live header portion now*/
1318
1319     if (fseek(rrd_file, (sizeof(stat_head_t)
1320                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1321                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1322               SEEK_SET) != 0) {
1323         rrd_set_error("seek rrd for live header writeback");
1324         free(updvals);
1325         free(tmpl_idx);
1326         rrd_free(&rrd);
1327         free(pdp_temp);
1328         free(pdp_new);
1329         fclose(rrd_file);
1330         return(-1);
1331     }
1332
1333     if(version >= 3) {
1334             if(fwrite( rrd.live_head,
1335                        sizeof(live_head_t), 1, rrd_file) != 1){
1336                 rrd_set_error("fwrite live_head to rrd");
1337                 free(updvals);
1338                 rrd_free(&rrd);
1339                 free(tmpl_idx);
1340                 free(pdp_temp);
1341                 free(pdp_new);
1342                 fclose(rrd_file);
1343                 return(-1);
1344             }
1345     }
1346     else {
1347             if(fwrite( &rrd.live_head->last_up,
1348                        sizeof(time_t), 1, rrd_file) != 1){
1349                 rrd_set_error("fwrite live_head to rrd");
1350                 free(updvals);
1351                 rrd_free(&rrd);
1352                 free(tmpl_idx);
1353                 free(pdp_temp);
1354                 free(pdp_new);
1355                 fclose(rrd_file);
1356                 return(-1);
1357             }
1358     }
1359             
1360
1361     if(fwrite( rrd.pdp_prep,
1362                sizeof(pdp_prep_t),
1363                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1364         rrd_set_error("ftwrite pdp_prep to rrd");
1365         free(updvals);
1366         rrd_free(&rrd);
1367         free(tmpl_idx);
1368         free(pdp_temp);
1369         free(pdp_new);
1370         fclose(rrd_file);
1371         return(-1);
1372     }
1373
1374     if(fwrite( rrd.cdp_prep,
1375                sizeof(cdp_prep_t),
1376                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1377        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1378
1379         rrd_set_error("ftwrite cdp_prep to rrd");
1380         free(updvals);
1381         free(tmpl_idx);
1382         rrd_free(&rrd);
1383         free(pdp_temp);
1384         free(pdp_new);
1385         fclose(rrd_file);
1386         return(-1);
1387     }
1388
1389     if(fwrite( rrd.rra_ptr,
1390                sizeof(rra_ptr_t), 
1391                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1392         rrd_set_error("fwrite rra_ptr to rrd");
1393         free(updvals);
1394         free(tmpl_idx);
1395         rrd_free(&rrd);
1396         free(pdp_temp);
1397         free(pdp_new);
1398         fclose(rrd_file);
1399         return(-1);
1400     }
1401
1402     /* OK now close the files and free the memory */
1403     if(fclose(rrd_file) != 0){
1404         rrd_set_error("closing rrd");
1405         free(updvals);
1406         free(tmpl_idx);
1407         rrd_free(&rrd);
1408         free(pdp_temp);
1409         free(pdp_new);
1410         return(-1);
1411     }
1412
1413     /* calling the smoothing code here guarantees at most
1414          * one smoothing operation per rrd_update call. Unfortunately,
1415          * it is possible with bulk updates, or a long-delayed update
1416          * for smoothing to occur off-schedule. This really isn't
1417          * critical except during the burning cycles. */
1418         if (schedule_smooth)
1419         {
1420           rrd_file = fopen(filename,"rb+");
1421           rra_start = rra_begin;
1422           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1423           {
1424             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1425                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1426             {
1427 #ifdef DEBUG
1428               fprintf(stderr,"Running smoother for rra %ld\n",i);
1429 #endif
1430               apply_smoother(&rrd,i,rra_start,rrd_file);
1431               if (rrd_test_error())
1432                 break;
1433             }
1434             rra_start += rrd.rra_def[i].row_cnt
1435               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1436           }
1437           fclose(rrd_file);
1438         }
1439     rrd_free(&rrd);
1440     free(updvals);
1441     free(tmpl_idx);
1442     free(pdp_new);
1443     free(pdp_temp);
1444     return(0);
1445 }
1446
1447 /*
1448  * get exclusive lock to whole file.
1449  * lock gets removed when we close the file
1450  *
1451  * returns 0 on success
1452  */
1453 int
1454 LockRRD(FILE *rrdfile)
1455 {
1456     int rrd_fd;         /* File descriptor for RRD */
1457     int rcstat;
1458
1459     rrd_fd = fileno(rrdfile);
1460
1461         {
1462 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1463     struct _stat st;
1464
1465     if ( _fstat( rrd_fd, &st ) == 0 ) {
1466             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1467     } else {
1468             rcstat = -1;
1469     }
1470 #else
1471     struct flock        lock;
1472     lock.l_type = F_WRLCK;    /* exclusive write lock */
1473     lock.l_len = 0;           /* whole file */
1474     lock.l_start = 0;         /* start of file */
1475     lock.l_whence = SEEK_SET;   /* end of file */
1476
1477     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1478 #endif
1479         }
1480
1481     return(rcstat);
1482 }
1483
1484
1485 #ifdef HAVE_MMAP
1486 info_t
1487 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1488                unsigned short CDP_scratch_idx, 
1489 #ifndef DEBUG
1490 FILE UNUSED(*rrd_file),
1491 #else
1492 FILE *rrd_file,
1493 #endif
1494                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1495 #else
1496 info_t
1497 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1498                unsigned short CDP_scratch_idx, FILE *rrd_file,
1499                    info_t *pcdp_summary, time_t *rra_time)
1500 #endif
1501 {
1502    unsigned long ds_idx, cdp_idx;
1503    infoval iv;
1504   
1505    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1506    {
1507       /* compute the cdp index */
1508       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1509 #ifdef DEBUG
1510           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1511              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1512              rrd -> rra_def[rra_idx].cf_nam);
1513 #endif 
1514       if (pcdp_summary != NULL)
1515           {
1516              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1517              /* append info to the return hash */
1518                  pcdp_summary = info_push(pcdp_summary,
1519                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1520                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1521                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1522          RD_I_VAL, iv);
1523           }
1524 #ifdef HAVE_MMAP
1525           memcpy((char *)rrd_mmaped_file + *rra_current,
1526                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1527                           sizeof(rrd_value_t));
1528 #else
1529           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1530                  sizeof(rrd_value_t),1,rrd_file) != 1)
1531           { 
1532              rrd_set_error("writing rrd");
1533              return 0;
1534           }
1535 #endif
1536           *rra_current += sizeof(rrd_value_t);
1537         }
1538         return (pcdp_summary);
1539 }