replace malloc/strncpy by strdup
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.15  Copyright by Tobi Oetiker, 1997-2006
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13  #include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37         time_t tv_sec; /* seconds */
38         long tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43         int  tz_minuteswest; /* minutes W of Greenwich */
44         int  tz_dsttime;     /* type of dst correction */
45 };
46
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
48
49         struct _timeb current_time;
50
51         _ftime(&current_time);
52
53         t->tv_sec  = current_time.time;
54         t->tv_usec = current_time.millitm * 1000;
55
56         return 0;
57 }
58
59 #endif
60 /*
61  * normilize time as returned by gettimeofday. usec part must
62  * be always >= 0
63  */
64 static void normalize_time(struct timeval *t)
65 {
66         if(t->tv_usec < 0) {
67                 t->tv_sec--;
68                 t->tv_usec += 1000000L;
69         }
70 }
71
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
76                                         unsigned long *rra_current,
77                                         unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
86                                         unsigned long *rra_current,
87                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
88                                         info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(char *filename, char *tmplt, int argc, char **argv);
91 int _rrd_update(char *filename, char *tmplt, int argc, char **argv, 
92                                         info_t*);
93
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
95
96
97 info_t *rrd_update_v(int argc, char **argv)
98 {
99     char             *tmplt = NULL;          
100         info_t *result = NULL;
101         infoval rc;
102       rc.u_int = -1;
103     optind = 0; opterr = 0;  /* initialize getopt */
104
105     while (1) {
106                 static struct option long_options[] =
107                         {
108                                 {"template",      required_argument, 0, 't'},
109                                 {0,0,0,0}
110                         };
111                 int option_index = 0;
112                 int opt;
113                 opt = getopt_long(argc, argv, "t:", 
114                                                   long_options, &option_index);
115                 
116                 if (opt == EOF)
117                         break;
118                 
119                 switch(opt) {
120                 case 't':
121                         tmplt = optarg;
122                         break;
123                 
124                 case '?':
125                         rrd_set_error("unknown option '%s'",argv[optind-1]);
126                         goto end_tag;
127                 }
128     }
129
130     /* need at least 2 arguments: filename, data. */
131     if (argc-optind < 2) {
132                 rrd_set_error("Not enough arguments");
133                 goto end_tag;
134     }
135     rc.u_int = 0;
136     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
137         rc.u_int = _rrd_update(argv[optind], tmplt,
138                       argc - optind - 1, argv + optind + 1, result);
139     result->value.u_int = rc.u_int;
140 end_tag:
141     return result;
142 }
143
144 int
145 rrd_update(int argc, char **argv)
146 {
147     char             *tmplt = NULL;          
148     int rc;
149     optind = 0; opterr = 0;  /* initialize getopt */
150
151     while (1) {
152                 static struct option long_options[] =
153                         {
154                                 {"template",      required_argument, 0, 't'},
155                                 {0,0,0,0}
156                         };
157                 int option_index = 0;
158                 int opt;
159                 opt = getopt_long(argc, argv, "t:", 
160                                                   long_options, &option_index);
161                 
162                 if (opt == EOF)
163                         break;
164                 
165                 switch(opt) {
166                 case 't':
167                         tmplt = optarg;
168                         break;
169                 
170                 case '?':
171                         rrd_set_error("unknown option '%s'",argv[optind-1]);
172                         return(-1);
173                 }
174     }
175
176     /* need at least 2 arguments: filename, data. */
177     if (argc-optind < 2) {
178                 rrd_set_error("Not enough arguments");
179
180                 return -1;
181     }
182  
183         rc = rrd_update_r(argv[optind], tmplt,
184                       argc - optind - 1, argv + optind + 1);
185     return rc;
186 }
187
188 int
189 rrd_update_r(char *filename, char *tmplt, int argc, char **argv)
190 {
191    return _rrd_update(filename, tmplt, argc, argv, NULL);
192 }
193
194 int
195 _rrd_update(char *filename, char *tmplt, int argc, char **argv, 
196    info_t *pcdp_summary)
197 {
198
199     int              arg_i = 2;
200     short            j;
201     unsigned long    i,ii,iii=1;
202
203     unsigned long    rra_begin;          /* byte pointer to the rra
204                                           * area in the rrd file.  this
205                                           * pointer never changes value */
206     unsigned long    rra_start;          /* byte pointer to the rra
207                                           * area in the rrd file.  this
208                                           * pointer changes as each rrd is
209                                           * processed. */
210     unsigned long    rra_current;        /* byte pointer to the current write
211                                           * spot in the rrd file. */
212     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
213     double           interval,
214                      pre_int,post_int;   /* interval between this and
215                                           * the last run */
216     unsigned long    proc_pdp_st;        /* which pdp_st was the last
217                                           * to be processed */
218     unsigned long    occu_pdp_st;        /* when was the pdp_st
219                                           * before the last update
220                                           * time */
221     unsigned long    proc_pdp_age;       /* how old was the data in
222                                           * the pdp prep area when it
223                                           * was last updated */
224     unsigned long    occu_pdp_age;       /* how long ago was the last
225                                           * pdp_step time */
226     rrd_value_t      *pdp_new;           /* prepare the incoming data
227                                           * to be added the the
228                                           * existing entry */
229     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
230                                           * to be added the the
231                                           * cdp values */
232
233     long             *tmpl_idx;          /* index representing the settings
234                                             transported by the tmplt index */
235     unsigned long    tmpl_cnt = 2;       /* time and data */
236
237     FILE             *rrd_file;
238     rrd_t            rrd;
239     time_t           current_time = 0;
240     time_t           rra_time = 0;       /* time of update for a RRA */
241     unsigned long    current_time_usec=0;/* microseconds part of current time */
242     struct timeval   tmp_time;           /* used for time conversion */
243
244     char             **updvals;
245     int              schedule_smooth = 0;
246         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
247                                          /* a vector of future Holt-Winters seasonal coefs */
248     unsigned long    elapsed_pdp_st;
249                                          /* number of elapsed PDP steps since last update */
250     unsigned long    *rra_step_cnt = NULL;
251                                          /* number of rows to be updated in an RRA for a data
252                                           * value. */
253     unsigned long    start_pdp_offset;
254                                          /* number of PDP steps since the last update that
255                                           * are assigned to the first CDP to be generated
256                                           * since the last update. */
257     unsigned short   scratch_idx;
258                                          /* index into the CDP scratch array */
259     enum cf_en       current_cf;
260                                          /* numeric id of the current consolidation function */
261     rpnstack_t       rpnstack; /* used for COMPUTE DS */
262     int              version;  /* rrd version */
263     char             *endptr; /* used in the conversion */
264 #ifdef HAVE_MMAP
265     void             *rrd_mmaped_file;
266     unsigned long    rrd_filesize;
267 #endif
268
269
270     rpnstack_init(&rpnstack);
271
272     /* need at least 1 arguments: data. */
273     if (argc < 1) {
274         rrd_set_error("Not enough arguments");
275         return -1;
276     }
277     
278     
279
280     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
281         return -1;
282     }
283     /* initialize time */
284     version = atoi(rrd.stat_head->version);
285     gettimeofday(&tmp_time, 0);
286     normalize_time(&tmp_time);
287     current_time = tmp_time.tv_sec;
288     if(version >= 3) {
289         current_time_usec = tmp_time.tv_usec;
290     }
291     else {
292         current_time_usec = 0;
293     }
294
295     rra_current = rra_start = rra_begin = ftell(rrd_file);
296     /* This is defined in the ANSI C standard, section 7.9.5.3:
297
298         When a file is opened with udpate mode ('+' as the second
299         or third character in the ... list of mode argument
300         variables), both input and ouptut may be performed on the
301         associated stream.  However, ...  input may not be directly
302         followed by output without an intervening call to a file
303         positioning function, unless the input oepration encounters
304         end-of-file. */
305 #ifdef HAVE_MMAP
306     fseek(rrd_file, 0, SEEK_END);
307     rrd_filesize = ftell(rrd_file);
308     fseek(rrd_file, rra_current, SEEK_SET);
309 #else
310     fseek(rrd_file, 0, SEEK_CUR);
311 #endif
312
313     
314     /* get exclusive lock to whole file.
315      * lock gets removed when we close the file.
316      */
317     if (LockRRD(rrd_file) != 0) {
318       rrd_set_error("could not lock RRD");
319       rrd_free(&rrd);
320       fclose(rrd_file);
321       return(-1);   
322     } 
323
324     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
325         rrd_set_error("allocating updvals pointer array");
326         rrd_free(&rrd);
327         fclose(rrd_file);
328         return(-1);
329     }
330
331     if ((pdp_temp = malloc(sizeof(rrd_value_t)
332                            *rrd.stat_head->ds_cnt))==NULL){
333         rrd_set_error("allocating pdp_temp ...");
334         free(updvals);
335         rrd_free(&rrd);
336         fclose(rrd_file);
337         return(-1);
338     }
339
340     if ((tmpl_idx = malloc(sizeof(unsigned long)
341                            *(rrd.stat_head->ds_cnt+1)))==NULL){
342         rrd_set_error("allocating tmpl_idx ...");
343         free(pdp_temp);
344         free(updvals);
345         rrd_free(&rrd);
346         fclose(rrd_file);
347         return(-1);
348     }
349     /* initialize tmplt redirector */
350     /* default config example (assume DS 1 is a CDEF DS)
351        tmpl_idx[0] -> 0; (time)
352        tmpl_idx[1] -> 1; (DS 0)
353        tmpl_idx[2] -> 3; (DS 2)
354        tmpl_idx[3] -> 4; (DS 3) */
355     tmpl_idx[0] = 0; /* time */
356     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
357         {
358            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
359               tmpl_idx[ii++]=i;
360         }
361     tmpl_cnt= ii;
362
363     if (tmplt) {
364         /* we should work on a writeable copy here */
365         char *dsname;
366         unsigned int tmpl_len;
367         tmplt = strdup(tmplt);
368         dsname = tmplt;
369         tmpl_cnt = 1; /* the first entry is the time */
370         tmpl_len = strlen(tmplt);
371         for(i=0;i<=tmpl_len ;i++) {
372             if (tmplt[i] == ':' || tmplt[i] == '\0') {
373                 tmplt[i] = '\0';
374                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
375                     rrd_set_error("tmplt contains more DS definitions than RRD");
376                     free(updvals); free(pdp_temp);
377                     free(tmpl_idx); rrd_free(&rrd);
378                     fclose(rrd_file); return(-1);
379                 }
380                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
381                     rrd_set_error("unknown DS name '%s'",dsname);
382                     free(updvals); free(pdp_temp);
383                     free(tmplt);
384                     free(tmpl_idx); rrd_free(&rrd);
385                     fclose(rrd_file); return(-1);
386                 } else {
387                   /* the first element is always the time */
388                   tmpl_idx[tmpl_cnt-1]++; 
389                   /* go to the next entry on the tmplt */
390                   dsname = &tmplt[i+1];
391                   /* fix the damage we did before */
392                   if (i<tmpl_len) {
393                      tmplt[i]=':';
394                   } 
395
396                 }
397             }       
398         }
399         free(tmplt);
400     }
401     if ((pdp_new = malloc(sizeof(rrd_value_t)
402                           *rrd.stat_head->ds_cnt))==NULL){
403         rrd_set_error("allocating pdp_new ...");
404         free(updvals);
405         free(pdp_temp);
406         free(tmpl_idx);
407         rrd_free(&rrd);
408         fclose(rrd_file);
409         return(-1);
410     }
411
412 #ifdef HAVE_MMAP
413     rrd_mmaped_file = mmap(0, 
414                         rrd_filesize, 
415                         PROT_READ | PROT_WRITE, 
416                         MAP_SHARED, 
417                         fileno(rrd_file), 
418                         0);
419     if (rrd_mmaped_file == MAP_FAILED) {
420         rrd_set_error("error mmapping file %s", filename);
421         free(updvals);
422         free(pdp_temp);
423         free(tmpl_idx);
424         rrd_free(&rrd);
425         fclose(rrd_file);
426         return(-1);
427     }
428 #endif
429     /* loop through the arguments. */
430     for(arg_i=0; arg_i<argc;arg_i++) {
431         char *stepper = strdup(argv[arg_i]);
432         char *step_start = stepper;
433         char *p;
434         char *parsetime_error = NULL;
435         enum {atstyle, normal} timesyntax;
436         struct rrd_time_value ds_tv;
437         if (stepper == NULL){
438                 rrd_set_error("failed duplication argv entry");
439                 free(step_start);
440                 free(updvals);
441                 free(pdp_temp);  
442                 free(tmpl_idx);
443                 rrd_free(&rrd);
444 #ifdef HAVE_MMAP
445                 munmap(rrd_mmaped_file, rrd_filesize);
446 #endif
447                 fclose(rrd_file);
448                 return(-1);
449          }
450         /* initialize all ds input to unknown except the first one
451            which has always got to be set */
452         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
453         updvals[0]=stepper;
454         /* separate all ds elements; first must be examined separately
455            due to alternate time syntax */
456         if ((p=strchr(stepper,'@'))!=NULL) {
457             timesyntax = atstyle;
458             *p = '\0';
459             stepper = p+1;
460         } else if ((p=strchr(stepper,':'))!=NULL) {
461             timesyntax = normal;
462             *p = '\0';
463             stepper = p+1;
464         } else {
465             rrd_set_error("expected timestamp not found in data source from %s",
466                           argv[arg_i]);
467             free(step_start);
468             break;
469         }
470         ii=1;
471         updvals[tmpl_idx[ii]] = stepper;
472         while (*stepper) {
473             if (*stepper == ':') {
474                 *stepper = '\0';
475                 ii++;
476                 if (ii<tmpl_cnt){                   
477                     updvals[tmpl_idx[ii]] = stepper+1;
478                 }
479             }
480             stepper++;
481         }
482
483         if (ii != tmpl_cnt-1) {
484             rrd_set_error("expected %lu data source readings (got %lu) from %s",
485                           tmpl_cnt-1, ii, argv[arg_i]);
486             free(step_start);
487             break;
488         }
489         
490         /* get the time from the reading ... handle N */
491         if (timesyntax == atstyle) {
492             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
493                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
494                 free(step_start);
495                 break;
496             }
497             if (ds_tv.type == RELATIVE_TO_END_TIME ||
498                 ds_tv.type == RELATIVE_TO_START_TIME) {
499                 rrd_set_error("specifying time relative to the 'start' "
500                               "or 'end' makes no sense here: %s",
501                               updvals[0]);
502                 free(step_start);
503                 break;
504             }
505
506             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
507             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
508             
509         } else if (strcmp(updvals[0],"N")==0){
510             gettimeofday(&tmp_time, 0);
511             normalize_time(&tmp_time);
512             current_time = tmp_time.tv_sec;
513             current_time_usec = tmp_time.tv_usec;
514         } else {
515             double tmp;
516             tmp = strtod(updvals[0], 0);
517             current_time = floor(tmp);
518             current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
519         }
520         /* dont do any correction for old version RRDs */
521         if(version < 3) 
522             current_time_usec = 0;
523         
524         if(current_time < rrd.live_head->last_up || 
525           (current_time == rrd.live_head->last_up && 
526            (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
527             rrd_set_error("illegal attempt to update using time %ld when "
528                           "last update time is %ld (minimum one second step)",
529                           current_time, rrd.live_head->last_up);
530             free(step_start);
531             break;
532         }
533         
534         
535         /* seek to the beginning of the rra's */
536         if (rra_current != rra_begin) {
537 #ifndef HAVE_MMAP
538             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
539                 rrd_set_error("seek error in rrd");
540                 free(step_start);
541                 break;
542             }
543 #endif
544             rra_current = rra_begin;
545         }
546         rra_start = rra_begin;
547
548         /* when was the current pdp started */
549         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
550         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
551
552         /* when did the last pdp_st occur */
553         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
554         occu_pdp_st = current_time - occu_pdp_age;
555
556         /* interval = current_time - rrd.live_head->last_up; */
557         interval    = (double)(current_time - rrd.live_head->last_up) 
558                     + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
559
560         if (occu_pdp_st > proc_pdp_st){
561             /* OK we passed the pdp_st moment*/
562             pre_int =  (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
563                                                               * occurred before the latest
564                                                               * pdp_st moment*/
565             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
566             post_int = occu_pdp_age;                         /* how much after it */
567             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
568         } else {
569             pre_int = interval;
570             post_int = 0;
571         }
572
573 #ifdef DEBUG
574         printf(
575                "proc_pdp_age %lu\t"
576                "proc_pdp_st %lu\t" 
577                "occu_pfp_age %lu\t" 
578                "occu_pdp_st %lu\t"
579                "int %lf\t"
580                "pre_int %lf\t"
581                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
582                 occu_pdp_age, occu_pdp_st,
583                interval, pre_int, post_int);
584 #endif
585     
586         /* process the data sources and update the pdp_prep 
587          * area accordingly */
588         for(i=0;i<rrd.stat_head->ds_cnt;i++){
589             enum dst_en dst_idx;
590             dst_idx= dst_conv(rrd.ds_def[i].dst);
591
592             /* make sure we do not build diffs with old last_ds values */
593             if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval 
594                 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
595                 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
596                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
597             }
598
599             /* NOTE: DST_CDEF should never enter this if block, because
600              * updvals[i+1][0] is initialized to 'U'; unless the caller
601              * accidently specified a value for the DST_CDEF. To handle 
602               * this case, an extra check is required. */
603
604             if((updvals[i+1][0] != 'U') &&
605                    (dst_idx != DST_CDEF) &&
606                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
607                double rate = DNAN;
608                /* the data source type defines how to process the data */
609                 /* pdp_new contains rate * time ... eg the bytes
610                  * transferred during the interval. Doing it this way saves
611                  * a lot of math operations */
612                 
613
614                 switch(dst_idx){
615                 case DST_COUNTER:
616                 case DST_DERIVE:
617                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
618                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
619                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
620                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
621                                  break;
622                             }
623                        }
624                        if (rrd_test_error()){
625                             break;
626                        }
627                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
628                        if(dst_idx == DST_COUNTER) {
629                           /* simple overflow catcher suggested by Andres Kroonmaa */
630                           /* this will fail terribly for non 32 or 64 bit counters ... */
631                           /* are there any others in SNMP land ? */
632                           if (pdp_new[i] < (double)0.0 ) 
633                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
634                           if (pdp_new[i] < (double)0.0 ) 
635                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
636                        }
637                        rate = pdp_new[i] / interval;
638                     }
639                    else {
640                      pdp_new[i]= DNAN;          
641                    }
642                    break;
643                 case DST_ABSOLUTE:
644                     errno = 0;
645                     pdp_new[i] = strtod(updvals[i+1],&endptr);
646                     if (errno > 0){
647                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
648                         break;
649                     };
650                     if (endptr[0] != '\0'){
651                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
652                         break;
653                     }
654                     rate = pdp_new[i] / interval;                 
655                     break;
656                 case DST_GAUGE:
657                     errno = 0;
658                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
659                     if (errno > 0){
660                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
661                         break;
662                     };
663                     if (endptr[0] != '\0'){
664                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
665                         break;
666                     }
667                     rate = pdp_new[i] / interval;                  
668                     break;
669                 default:
670                     rrd_set_error("rrd contains unknown DS type : '%s'",
671                                   rrd.ds_def[i].dst);
672                     break;
673                 }
674                 /* break out of this for loop if the error string is set */
675                 if (rrd_test_error()){
676                     break;
677                 }
678                /* make sure pdp_temp is neither too large or too small
679                 * if any of these occur it becomes unknown ...
680                 * sorry folks ... */
681                if ( ! isnan(rate) && 
682                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
683                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
684                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
685                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
686                   pdp_new[i] = DNAN;
687                }               
688             } else {
689                 /* no news is news all the same */
690                 pdp_new[i] = DNAN;
691             }
692
693             
694             /* make a copy of the command line argument for the next run */
695 #ifdef DEBUG
696             fprintf(stderr,
697                     "prep ds[%lu]\t"
698                     "last_arg '%s'\t"
699                     "this_arg '%s'\t"
700                     "pdp_new %10.2f\n",
701                     i,
702                     rrd.pdp_prep[i].last_ds,
703                     updvals[i+1], pdp_new[i]);
704 #endif
705             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
706                 strncpy(rrd.pdp_prep[i].last_ds,
707                         updvals[i+1],LAST_DS_LEN-1);
708                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
709             }
710         }
711         /* break out of the argument parsing loop if the error_string is set */
712         if (rrd_test_error()){
713             free(step_start);
714             break;
715         }
716         /* has a pdp_st moment occurred since the last run ? */
717
718         if (proc_pdp_st == occu_pdp_st){
719             /* no we have not passed a pdp_st moment. therefore update is simple */
720
721             for(i=0;i<rrd.stat_head->ds_cnt;i++){
722                 if(isnan(pdp_new[i])) {            
723                     /* this is not realy accurate if we use subsecond data arival time
724                        should have thought of it when going subsecond resolution ...
725                        sorry next format change we will have it! */
726                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);          
727                 } else {
728                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
729                         rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
730                      } else {
731                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
732                      }
733                 }
734 #ifdef DEBUG
735                 fprintf(stderr,
736                         "NO PDP  ds[%lu]\t"
737                         "value %10.2f\t"
738                         "unkn_sec %5lu\n",
739                         i,
740                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
741                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
742 #endif
743             }   
744         } else {
745             /* an pdp_st has occurred. */
746
747             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
748              * occurred up to the last run.        
749             pdp_new[] contains rate*seconds from the latest run.
750             pdp_temp[] will contain the rate for cdp */
751
752             for(i=0;i<rrd.stat_head->ds_cnt;i++){
753                 /* update pdp_prep to the current pdp_st. */
754                 double pre_unknown = 0.0;               
755                 if(isnan(pdp_new[i]))
756                     /* a final bit of unkonwn to be added bevore calculation
757                      * we use a tempaorary variable for this so that we 
758                      * don't have to turn integer lines before using the value */                
759                     pre_unknown = pre_int;
760                 else {
761                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
762                         rrd.pdp_prep[i].scratch[PDP_val].u_val=         pdp_new[i]/interval*pre_int;
763                      } else {
764                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
765                      }
766                  }
767                 
768
769                 /* if too much of the pdp_prep is unknown we dump it */
770                 if ( 
771                     /* removed because this does not agree with the definition
772                        a heart beat can be unknown */
773                     /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
774                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
775                     /* if the interval is larger thatn mrhb we get NAN */
776                     (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
777                     (occu_pdp_st-proc_pdp_st <= 
778                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
779                     pdp_temp[i] = DNAN;
780                 } else {
781                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
782                         / ((double)(occu_pdp_st - proc_pdp_st
783                                     - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
784                             -pre_unknown);
785                 }
786
787                 /* process CDEF data sources; remember each CDEF DS can
788                  * only reference other DS with a lower index number */
789             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
790                    rpnp_t *rpnp;
791                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
792                    /* substitue data values for OP_VARIABLE nodes */
793                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
794                    {
795                           if (rpnp[ii].op == OP_VARIABLE) {
796                                  rpnp[ii].op = OP_NUMBER;
797                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
798                           }
799                    }
800                    /* run the rpn calculator */
801                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
802                           free(rpnp);
803                           break; /* exits the data sources pdp_temp loop */
804                    }
805                 }
806         
807                 /* make pdp_prep ready for the next run */
808                 if(isnan(pdp_new[i])){
809                     /* this is not realy accurate if we use subsecond data arival time
810                        should have thought of it when going subsecond resolution ...
811                        sorry next format change we will have it! */
812                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
813                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
814                 } else {
815                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
816                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
817                         pdp_new[i]/interval*post_int;
818                 }
819
820 #ifdef DEBUG
821                 fprintf(stderr,
822                         "PDP UPD ds[%lu]\t"
823                         "pdp_temp %10.2f\t"
824                         "new_prep %10.2f\t"
825                         "new_unkn_sec %5lu\n",
826                         i, pdp_temp[i],
827                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
828                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
829 #endif
830             }
831
832                 /* if there were errors during the last loop, bail out here */
833             if (rrd_test_error()){
834                free(step_start);
835                break;
836             }
837
838                 /* compute the number of elapsed pdp_st moments */
839                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
840 #ifdef DEBUG
841                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
842 #endif
843                 if (rra_step_cnt == NULL)
844                 {
845                    rra_step_cnt = (unsigned long *) 
846                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
847                 }
848
849             for(i = 0, rra_start = rra_begin;
850                 i < rrd.stat_head->rra_cnt;
851             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
852                 i++)
853                 {
854                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
855                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
856                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
857         if (start_pdp_offset <= elapsed_pdp_st) {
858            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
859                       rrd.rra_def[i].pdp_cnt + 1;
860             } else {
861                    rra_step_cnt[i] = 0;
862                 }
863
864                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
865                 {
866                    /* If this is a bulk update, we need to skip ahead in the seasonal
867                         * arrays so that they will be correct for the next observed value;
868                         * note that for the bulk update itself, no update will occur to
869                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
870                         * be set to DNAN. */
871            if (rra_step_cnt[i] > 2) 
872                    {
873                           /* skip update by resetting rra_step_cnt[i],
874                            * note that this is not data source specific; this is due
875                            * to the bulk update, not a DNAN value for the specific data
876                            * source. */
877                           rra_step_cnt[i] = 0;
878               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
879                              &last_seasonal_coef);
880                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
881                              &seasonal_coef);
882                    }
883                 
884                   /* periodically run a smoother for seasonal effects */
885                   /* Need to use first cdp parameter buffer to track
886                    * burnin (burnin requires a specific smoothing schedule).
887                    * The CDP_init_seasonal parameter is really an RRA level,
888                    * not a data source within RRA level parameter, but the rra_def
889                    * is read only for rrd_update (not flushed to disk). */
890                   iii = i*(rrd.stat_head -> ds_cnt);
891                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
892                           <= BURNIN_CYCLES)
893                   {
894                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
895                                  > rrd.rra_def[i].row_cnt - 1) {
896                            /* mark off one of the burnin cycles */
897                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
898                        schedule_smooth = 1;
899                          }  
900                   } else {
901                          /* someone has no doubt invented a trick to deal with this
902                           * wrap around, but at least this code is clear. */
903                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
904                              rrd.rra_ptr[i].cur_row)
905                          {
906                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
907                                   * mapping between PDP and CDP */
908                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
909                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
910                                  {
911 #ifdef DEBUG
912                                         fprintf(stderr,
913                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
914                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
915                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
916 #endif
917                                         schedule_smooth = 1;
918                                  }
919              } else {
920                                  /* can't rely on negative numbers because we are working with
921                                   * unsigned values */
922                                  /* Don't need modulus here. If we've wrapped more than once, only
923                                   * one smooth is executed at the end. */
924                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
925                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
926                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
927                                  {
928 #ifdef DEBUG
929                                         fprintf(stderr,
930                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
931                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
932                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
933 #endif
934                                         schedule_smooth = 1;
935                                  }
936                          }
937                   }
938
939               rra_current = ftell(rrd_file); 
940                 } /* if cf is DEVSEASONAL or SEASONAL */
941
942         if (rrd_test_error()) break;
943
944                     /* update CDP_PREP areas */
945                     /* loop over data soures within each RRA */
946                     for(ii = 0;
947                         ii < rrd.stat_head->ds_cnt;
948                         ii++)
949                         {
950                         
951                         /* iii indexes the CDP prep area for this data source within the RRA */
952                         iii=i*rrd.stat_head->ds_cnt+ii;
953
954                         if (rrd.rra_def[i].pdp_cnt > 1) {
955                           
956                            if (rra_step_cnt[i] > 0) {
957                            /* If we are in this block, as least 1 CDP value will be written to
958                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
959                                 * to be written, then the "fill in" value is the CDP_secondary_val
960                                 * entry. */
961                                   if (isnan(pdp_temp[ii]))
962                   {
963                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
964                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
965                                   } else {
966                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
967                                           * CDP data entries. No matter the CF, the value is the same because
968                                           * the average, max, min, and last of a list of identical values is
969                                           * the same, namely, the value itself. */
970                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
971                                   }
972                      
973                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
974                                       > rrd.rra_def[i].pdp_cnt*
975                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
976                                   {
977                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
978                                          /* initialize carry over */
979                                          if (current_cf == CF_AVERAGE) {
980                                                    if (isnan(pdp_temp[ii])) { 
981                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
982                                                    } else {
983                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
984                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
985                                                    }
986                                          } else {
987                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
988                                          }
989                                   } else {
990                                          rrd_value_t cum_val, cur_val; 
991                                      switch (current_cf) {
992                                                 case CF_AVERAGE:
993                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
994                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
995                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
996                                                (cum_val + cur_val * start_pdp_offset) /
997                                            (rrd.rra_def[i].pdp_cnt
998                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
999                                                    /* initialize carry over value */
1000                                                    if (isnan(pdp_temp[ii])) { 
1001                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1002                                                    } else {
1003                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1004                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1005                                                    }
1006                                                    break;
1007                                                 case CF_MAXIMUM:
1008                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1009                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1010 #ifdef DEBUG
1011                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1012                                                           isnan(pdp_temp[ii])) {
1013                                                      fprintf(stderr,
1014                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1015                                                                 i,ii);
1016                                                          exit(-1);
1017                                                   }
1018 #endif
1019                                                   if (cur_val > cum_val)
1020                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1021                                                   else
1022                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1023                                                   /* initialize carry over value */
1024                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1025                                                   break;
1026                                                 case CF_MINIMUM:
1027                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1028                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1029 #ifdef DEBUG
1030                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1031                                                           isnan(pdp_temp[ii])) {
1032                                                      fprintf(stderr,
1033                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1034                                                                 i,ii);
1035                                                          exit(-1);
1036                                                   }
1037 #endif
1038                                                   if (cur_val < cum_val)
1039                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1040                                                   else
1041                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1042                                                   /* initialize carry over value */
1043                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1044                                                   break;
1045                                                 case CF_LAST:
1046                                                 default:
1047                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1048                                                    /* initialize carry over value */
1049                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1050                                                 break;
1051                                          }
1052                                   } /* endif meets xff value requirement for a valid value */
1053                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1054                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1055                                   if (isnan(pdp_temp[ii]))
1056                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1057                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1058                                   else
1059                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1060                } else  /* rra_step_cnt[i]  == 0 */
1061                            {
1062 #ifdef DEBUG
1063                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1064                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1065                                          i,ii);
1066                                   } else {
1067                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1068                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1069                                   }
1070 #endif
1071                                   if (isnan(pdp_temp[ii])) {
1072                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1073                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1074                                   {
1075                                          if (current_cf == CF_AVERAGE) {
1076                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1077                                                    elapsed_pdp_st;
1078                                          } else {
1079                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1080                                          }
1081 #ifdef DEBUG
1082                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1083                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1084 #endif
1085                                   } else {
1086                                          switch (current_cf) {
1087                                          case CF_AVERAGE:
1088                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1089                                                    elapsed_pdp_st;
1090                                                 break;
1091                                          case CF_MINIMUM:
1092                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1093                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1094                                                 break; 
1095                                          case CF_MAXIMUM:
1096                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1097                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1098                                                 break; 
1099                                          case CF_LAST:
1100                                          default:
1101                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1102                                                 break;
1103                                          }
1104                                   }
1105                            }
1106                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1107                            if (elapsed_pdp_st > 2)
1108                            {
1109                                    switch (current_cf) {
1110                                    case CF_AVERAGE:
1111                                    default:
1112                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1113                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1114                                           break;
1115                    case CF_SEASONAL:
1116                                    case CF_DEVSEASONAL:
1117                                           /* need to update cached seasonal values, so they are consistent
1118                                            * with the bulk update */
1119                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1120                                            * CDP_last_deviation are the same. */
1121                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1122                                                  last_seasonal_coef[ii];
1123                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1124                                                  seasonal_coef[ii];
1125                                           break;
1126                    case CF_HWPREDICT:
1127                                           /* need to update the null_count and last_null_count.
1128                                            * even do this for non-DNAN pdp_temp because the
1129                                            * algorithm is not learning from batch updates. */
1130                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1131                                                  elapsed_pdp_st;
1132                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1133                                                  elapsed_pdp_st - 1;
1134                                           /* fall through */
1135                                    case CF_DEVPREDICT:
1136                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1137                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1138                                           break;
1139                    case CF_FAILURES:
1140                                           /* do not count missed bulk values as failures */
1141                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1142                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1143                                           /* need to reset violations buffer.
1144                                            * could do this more carefully, but for now, just
1145                                            * assume a bulk update wipes away all violations. */
1146                       erase_violations(&rrd, iii, i);
1147                                           break;
1148                                    }
1149                            } 
1150                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1151
1152                         if (rrd_test_error()) break;
1153
1154                         } /* endif data sources loop */
1155         } /* end RRA Loop */
1156
1157                 /* this loop is only entered if elapsed_pdp_st < 3 */
1158                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1159                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1160                 {
1161                for(i = 0, rra_start = rra_begin;
1162                    i < rrd.stat_head->rra_cnt;
1163                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1164                    i++)
1165                    {
1166                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1167
1168                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1169                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1170                           {
1171                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1172                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1173                                 &seasonal_coef);
1174                  rra_current = ftell(rrd_file);
1175                           }
1176                           if (rrd_test_error()) break;
1177                       /* loop over data soures within each RRA */
1178                       for(ii = 0;
1179                           ii < rrd.stat_head->ds_cnt;
1180                           ii++)
1181                           {
1182                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1183                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1184                                     scratch_idx, seasonal_coef);
1185                           }
1186            } /* end RRA Loop */
1187                    if (rrd_test_error()) break;
1188             } /* end elapsed_pdp_st loop */
1189
1190                 if (rrd_test_error()) break;
1191
1192                 /* Ready to write to disk */
1193                 /* Move sequentially through the file, writing one RRA at a time.
1194                  * Note this architecture divorces the computation of CDP with
1195                  * flushing updated RRA entries to disk. */
1196             for(i = 0, rra_start = rra_begin;
1197                 i < rrd.stat_head->rra_cnt;
1198             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1199                 i++) {
1200                 /* is there anything to write for this RRA? If not, continue. */
1201         if (rra_step_cnt[i] == 0) continue;
1202
1203                 /* write the first row */
1204 #ifdef DEBUG
1205         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1206 #endif
1207             rrd.rra_ptr[i].cur_row++;
1208             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1209                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1210                 /* positition on the first row */
1211                 rra_pos_tmp = rra_start +
1212                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1213                 if(rra_pos_tmp != rra_current) {
1214 #ifndef HAVE_MMAP
1215                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1216                       rrd_set_error("seek error in rrd");
1217                       break;
1218                    }
1219 #endif
1220                    rra_current = rra_pos_tmp;
1221                 }
1222
1223 #ifdef DEBUG
1224             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1225 #endif
1226                 scratch_idx = CDP_primary_val;
1227                 if (pcdp_summary != NULL)
1228                 {
1229                    rra_time = (current_time - current_time 
1230                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1231                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1232                 }
1233 #ifdef HAVE_MMAP
1234                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1235                    pcdp_summary, &rra_time, rrd_mmaped_file);
1236 #else
1237                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1238                    pcdp_summary, &rra_time);
1239 #endif
1240                 if (rrd_test_error()) break;
1241
1242                 /* write other rows of the bulk update, if any */
1243                 scratch_idx = CDP_secondary_val;
1244                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1245                 {
1246                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1247                    {
1248 #ifdef DEBUG
1249               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1250                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1251 #endif
1252                           /* wrap */
1253                           rrd.rra_ptr[i].cur_row = 0;
1254                           /* seek back to beginning of current rra */
1255                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1256                           {
1257                          rrd_set_error("seek error in rrd");
1258                          break;
1259                           }
1260 #ifdef DEBUG
1261                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1262 #endif
1263                           rra_current = rra_start;
1264                    }
1265                    if (pcdp_summary != NULL)
1266                    {
1267                       rra_time = (current_time - current_time 
1268                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1269                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1270                    }
1271 #ifdef HAVE_MMAP
1272                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1273                       pcdp_summary, &rra_time, rrd_mmaped_file);
1274 #else
1275                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1276                       pcdp_summary, &rra_time);
1277 #endif
1278                 }
1279                 
1280                 if (rrd_test_error())
1281                   break;
1282                 } /* RRA LOOP */
1283
1284             /* break out of the argument parsing loop if error_string is set */
1285             if (rrd_test_error()){
1286                    free(step_start);
1287                    break;
1288             } 
1289             
1290         } /* endif a pdp_st has occurred */ 
1291         rrd.live_head->last_up = current_time;
1292         rrd.live_head->last_up_usec = current_time_usec; 
1293         free(step_start);
1294     } /* function argument loop */
1295
1296     if (seasonal_coef != NULL) free(seasonal_coef);
1297     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1298         if (rra_step_cnt != NULL) free(rra_step_cnt);
1299     rpnstack_free(&rpnstack);
1300
1301 #ifdef HAVE_MMAP
1302     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1303             rrd_set_error("error writing(unmapping) file: %s", filename);
1304     }
1305 #endif    
1306     /* if we got here and if there is an error and if the file has not been
1307      * written to, then close things up and return. */
1308     if (rrd_test_error()) {
1309         free(updvals);
1310         free(tmpl_idx);
1311         rrd_free(&rrd);
1312         free(pdp_temp);
1313         free(pdp_new);
1314         fclose(rrd_file);
1315         return(-1);
1316     }
1317
1318     /* aargh ... that was tough ... so many loops ... anyway, its done.
1319      * we just need to write back the live header portion now*/
1320
1321     if (fseek(rrd_file, (sizeof(stat_head_t)
1322                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1323                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1324               SEEK_SET) != 0) {
1325         rrd_set_error("seek rrd for live header writeback");
1326         free(updvals);
1327         free(tmpl_idx);
1328         rrd_free(&rrd);
1329         free(pdp_temp);
1330         free(pdp_new);
1331         fclose(rrd_file);
1332         return(-1);
1333     }
1334
1335     if(version >= 3) {
1336             if(fwrite( rrd.live_head,
1337                        sizeof(live_head_t), 1, rrd_file) != 1){
1338                 rrd_set_error("fwrite live_head to rrd");
1339                 free(updvals);
1340                 rrd_free(&rrd);
1341                 free(tmpl_idx);
1342                 free(pdp_temp);
1343                 free(pdp_new);
1344                 fclose(rrd_file);
1345                 return(-1);
1346             }
1347     }
1348     else {
1349             if(fwrite( &rrd.live_head->last_up,
1350                        sizeof(time_t), 1, rrd_file) != 1){
1351                 rrd_set_error("fwrite live_head to rrd");
1352                 free(updvals);
1353                 rrd_free(&rrd);
1354                 free(tmpl_idx);
1355                 free(pdp_temp);
1356                 free(pdp_new);
1357                 fclose(rrd_file);
1358                 return(-1);
1359             }
1360     }
1361             
1362
1363     if(fwrite( rrd.pdp_prep,
1364                sizeof(pdp_prep_t),
1365                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1366         rrd_set_error("ftwrite pdp_prep to rrd");
1367         free(updvals);
1368         rrd_free(&rrd);
1369         free(tmpl_idx);
1370         free(pdp_temp);
1371         free(pdp_new);
1372         fclose(rrd_file);
1373         return(-1);
1374     }
1375
1376     if(fwrite( rrd.cdp_prep,
1377                sizeof(cdp_prep_t),
1378                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1379        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1380
1381         rrd_set_error("ftwrite cdp_prep to rrd");
1382         free(updvals);
1383         free(tmpl_idx);
1384         rrd_free(&rrd);
1385         free(pdp_temp);
1386         free(pdp_new);
1387         fclose(rrd_file);
1388         return(-1);
1389     }
1390
1391     if(fwrite( rrd.rra_ptr,
1392                sizeof(rra_ptr_t), 
1393                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1394         rrd_set_error("fwrite rra_ptr to rrd");
1395         free(updvals);
1396         free(tmpl_idx);
1397         rrd_free(&rrd);
1398         free(pdp_temp);
1399         free(pdp_new);
1400         fclose(rrd_file);
1401         return(-1);
1402     }
1403
1404     /* OK now close the files and free the memory */
1405     if(fclose(rrd_file) != 0){
1406         rrd_set_error("closing rrd");
1407         free(updvals);
1408         free(tmpl_idx);
1409         rrd_free(&rrd);
1410         free(pdp_temp);
1411         free(pdp_new);
1412         return(-1);
1413     }
1414
1415     /* calling the smoothing code here guarantees at most
1416          * one smoothing operation per rrd_update call. Unfortunately,
1417          * it is possible with bulk updates, or a long-delayed update
1418          * for smoothing to occur off-schedule. This really isn't
1419          * critical except during the burning cycles. */
1420         if (schedule_smooth)
1421         {
1422           rrd_file = fopen(filename,"rb+");
1423           rra_start = rra_begin;
1424           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1425           {
1426             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1427                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1428             {
1429 #ifdef DEBUG
1430               fprintf(stderr,"Running smoother for rra %ld\n",i);
1431 #endif
1432               apply_smoother(&rrd,i,rra_start,rrd_file);
1433               if (rrd_test_error())
1434                 break;
1435             }
1436             rra_start += rrd.rra_def[i].row_cnt
1437               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1438           }
1439           fclose(rrd_file);
1440         }
1441     rrd_free(&rrd);
1442     free(updvals);
1443     free(tmpl_idx);
1444     free(pdp_new);
1445     free(pdp_temp);
1446     return(0);
1447 }
1448
1449 /*
1450  * get exclusive lock to whole file.
1451  * lock gets removed when we close the file
1452  *
1453  * returns 0 on success
1454  */
1455 int
1456 LockRRD(FILE *rrdfile)
1457 {
1458     int rrd_fd;         /* File descriptor for RRD */
1459     int rcstat;
1460
1461     rrd_fd = fileno(rrdfile);
1462
1463         {
1464 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1465     struct _stat st;
1466
1467     if ( _fstat( rrd_fd, &st ) == 0 ) {
1468             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1469     } else {
1470             rcstat = -1;
1471     }
1472 #else
1473     struct flock        lock;
1474     lock.l_type = F_WRLCK;    /* exclusive write lock */
1475     lock.l_len = 0;           /* whole file */
1476     lock.l_start = 0;         /* start of file */
1477     lock.l_whence = SEEK_SET;   /* end of file */
1478
1479     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1480 #endif
1481         }
1482
1483     return(rcstat);
1484 }
1485
1486
1487 #ifdef HAVE_MMAP
1488 info_t
1489 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1490                unsigned short CDP_scratch_idx, 
1491 #ifndef DEBUG
1492 FILE UNUSED(*rrd_file),
1493 #else
1494 FILE *rrd_file,
1495 #endif
1496                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1497 #else
1498 info_t
1499 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1500                unsigned short CDP_scratch_idx, FILE *rrd_file,
1501                    info_t *pcdp_summary, time_t *rra_time)
1502 #endif
1503 {
1504    unsigned long ds_idx, cdp_idx;
1505    infoval iv;
1506   
1507    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1508    {
1509       /* compute the cdp index */
1510       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1511 #ifdef DEBUG
1512           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1513              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1514              rrd -> rra_def[rra_idx].cf_nam);
1515 #endif 
1516       if (pcdp_summary != NULL)
1517           {
1518              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1519              /* append info to the return hash */
1520                  pcdp_summary = info_push(pcdp_summary,
1521                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1522                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1523                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1524          RD_I_VAL, iv);
1525           }
1526 #ifdef HAVE_MMAP
1527           memcpy((char *)rrd_mmaped_file + *rra_current,
1528                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1529                           sizeof(rrd_value_t));
1530 #else
1531           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1532                  sizeof(rrd_value_t),1,rrd_file) != 1)
1533           { 
1534              rrd_set_error("writing rrd");
1535              return 0;
1536           }
1537 #endif
1538           *rra_current += sizeof(rrd_value_t);
1539         }
1540         return (pcdp_summary);
1541 }