resolve subsecond resolution logging and handling of unknown values by
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.11  Copyright by Tobi Oetiker, 1997-2005
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13  #include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37         time_t tv_sec; /* seconds */
38         long tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43         int  tz_minuteswest; /* minutes W of Greenwich */
44         int  tz_dsttime;     /* type of dst correction */
45 };
46
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
48
49         struct _timeb current_time;
50
51         _ftime(&current_time);
52
53         t->tv_sec  = current_time.time;
54         t->tv_usec = current_time.millitm * 1000;
55
56         return 0;
57 }
58
59 #endif
60 /*
61  * normilize time as returned by gettimeofday. usec part must
62  * be always >= 0
63  */
64 static void normalize_time(struct timeval *t)
65 {
66         if(t->tv_usec < 0) {
67                 t->tv_sec--;
68                 t->tv_usec += 1000000L;
69         }
70 }
71
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
76                                         unsigned long *rra_current,
77                                         unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
86                                         unsigned long *rra_current,
87                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
88                                         info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(char *filename, char *template, int argc, char **argv);
91 int _rrd_update(char *filename, char *template, int argc, char **argv, 
92                                         info_t*);
93
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
95
96
97 #ifdef STANDALONE
98 int 
99 main(int argc, char **argv){
100         rrd_update(argc,argv);
101         if (rrd_test_error()) {
102                 printf("RRDtool " PACKAGE_VERSION "  Copyright by Tobi Oetiker, 1997-2005\n\n"
103                         "Usage: rrdupdate filename\n"
104                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
105                         "\t\t\ttime|N:value[:value...]\n\n"
106                         "\t\t\tat-time@value[:value...]\n\n"
107                         "\t\t\t[ time:value[:value...] ..]\n\n");
108                                    
109                 printf("ERROR: %s\n",rrd_get_error());
110                 rrd_clear_error();                                                            
111                 return 1;
112         }
113         return 0;
114 }
115 #endif
116
117 info_t *rrd_update_v(int argc, char **argv)
118 {
119     char             *template = NULL;          
120         info_t *result = NULL;
121         infoval rc;
122     optind = 0; opterr = 0;  /* initialize getopt */
123
124     while (1) {
125                 static struct option long_options[] =
126                         {
127                                 {"template",      required_argument, 0, 't'},
128                                 {0,0,0,0}
129                         };
130                 int option_index = 0;
131                 int opt;
132                 opt = getopt_long(argc, argv, "t:", 
133                                                   long_options, &option_index);
134                 
135                 if (opt == EOF)
136                         break;
137                 
138                 switch(opt) {
139                 case 't':
140                         template = optarg;
141                         break;
142                 
143                 case '?':
144                         rrd_set_error("unknown option '%s'",argv[optind-1]);
145             rc.u_int = -1;
146                         goto end_tag;
147                 }
148     }
149
150     /* need at least 2 arguments: filename, data. */
151     if (argc-optind < 2) {
152                 rrd_set_error("Not enough arguments");
153         rc.u_int = -1;
154                 goto end_tag;
155     }
156     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
157         rc.u_int = _rrd_update(argv[optind], template,
158                       argc - optind - 1, argv + optind + 1, result);
159     result->value.u_int = rc.u_int;
160 end_tag:
161     return result;
162 }
163
164 int
165 rrd_update(int argc, char **argv)
166 {
167     char             *template = NULL;          
168     int rc;
169     optind = 0; opterr = 0;  /* initialize getopt */
170
171     while (1) {
172                 static struct option long_options[] =
173                         {
174                                 {"template",      required_argument, 0, 't'},
175                                 {0,0,0,0}
176                         };
177                 int option_index = 0;
178                 int opt;
179                 opt = getopt_long(argc, argv, "t:", 
180                                                   long_options, &option_index);
181                 
182                 if (opt == EOF)
183                         break;
184                 
185                 switch(opt) {
186                 case 't':
187                         template = optarg;
188                         break;
189                 
190                 case '?':
191                         rrd_set_error("unknown option '%s'",argv[optind-1]);
192                         return(-1);
193                 }
194     }
195
196     /* need at least 2 arguments: filename, data. */
197     if (argc-optind < 2) {
198                 rrd_set_error("Not enough arguments");
199
200                 return -1;
201     }
202  
203         rc = rrd_update_r(argv[optind], template,
204                       argc - optind - 1, argv + optind + 1);
205     return rc;
206 }
207
208 int
209 rrd_update_r(char *filename, char *template, int argc, char **argv)
210 {
211    return _rrd_update(filename, template, argc, argv, NULL);
212 }
213
214 int
215 _rrd_update(char *filename, char *template, int argc, char **argv, 
216    info_t *pcdp_summary)
217 {
218
219     int              arg_i = 2;
220     short            j;
221     unsigned long    i,ii,iii=1;
222
223     unsigned long    rra_begin;          /* byte pointer to the rra
224                                           * area in the rrd file.  this
225                                           * pointer never changes value */
226     unsigned long    rra_start;          /* byte pointer to the rra
227                                           * area in the rrd file.  this
228                                           * pointer changes as each rrd is
229                                           * processed. */
230     unsigned long    rra_current;        /* byte pointer to the current write
231                                           * spot in the rrd file. */
232     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
233     double           interval,
234         pre_int,post_int;                /* interval between this and
235                                           * the last run */
236     unsigned long    proc_pdp_st;        /* which pdp_st was the last
237                                           * to be processed */
238     unsigned long    occu_pdp_st;        /* when was the pdp_st
239                                           * before the last update
240                                           * time */
241     unsigned long    proc_pdp_age;       /* how old was the data in
242                                           * the pdp prep area when it
243                                           * was last updated */
244     unsigned long    occu_pdp_age;       /* how long ago was the last
245                                           * pdp_step time */
246     rrd_value_t      *pdp_new;           /* prepare the incoming data
247                                           * to be added the the
248                                           * existing entry */
249     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
250                                           * to be added the the
251                                           * cdp values */
252
253     long             *tmpl_idx;          /* index representing the settings
254                                             transported by the template index */
255     unsigned long    tmpl_cnt = 2;       /* time and data */
256
257     FILE             *rrd_file;
258     rrd_t            rrd;
259     time_t           current_time = 0;
260     time_t           rra_time = 0;       /* time of update for a RRA */
261     unsigned long    current_time_usec=0;/* microseconds part of current time */
262     struct timeval   tmp_time;           /* used for time conversion */
263
264     char             **updvals;
265     int              schedule_smooth = 0;
266         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
267                                          /* a vector of future Holt-Winters seasonal coefs */
268     unsigned long    elapsed_pdp_st;
269                                          /* number of elapsed PDP steps since last update */
270     unsigned long    *rra_step_cnt = NULL;
271                                          /* number of rows to be updated in an RRA for a data
272                                           * value. */
273     unsigned long    start_pdp_offset;
274                                          /* number of PDP steps since the last update that
275                                           * are assigned to the first CDP to be generated
276                                           * since the last update. */
277     unsigned short   scratch_idx;
278                                          /* index into the CDP scratch array */
279     enum cf_en       current_cf;
280                                          /* numeric id of the current consolidation function */
281     rpnstack_t       rpnstack; /* used for COMPUTE DS */
282     int              version;  /* rrd version */
283     char             *endptr; /* used in the conversion */
284 #ifdef HAVE_MMAP
285     void             *rrd_mmaped_file;
286     unsigned long    rrd_filesize;
287 #endif
288
289     rpnstack_init(&rpnstack);
290
291     /* need at least 1 arguments: data. */
292     if (argc < 1) {
293         rrd_set_error("Not enough arguments");
294         return -1;
295     }
296     
297     
298
299     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
300         return -1;
301     }
302     /* initialize time */
303     version = atoi(rrd.stat_head->version);
304     gettimeofday(&tmp_time, 0);
305     normalize_time(&tmp_time);
306     current_time = tmp_time.tv_sec;
307     if(version >= 3) {
308         current_time_usec = tmp_time.tv_usec;
309     }
310     else {
311         current_time_usec = 0;
312     }
313
314     rra_current = rra_start = rra_begin = ftell(rrd_file);
315     /* This is defined in the ANSI C standard, section 7.9.5.3:
316
317         When a file is opened with udpate mode ('+' as the second
318         or third character in the ... list of mode argument
319         variables), both input and ouptut may be performed on the
320         associated stream.  However, ...  input may not be directly
321         followed by output without an intervening call to a file
322         positioning function, unless the input oepration encounters
323         end-of-file. */
324 #ifdef HAVE_MMAP
325     fseek(rrd_file, 0, SEEK_END);
326     rrd_filesize = ftell(rrd_file);
327     fseek(rrd_file, rra_current, SEEK_SET);
328 #else
329     fseek(rrd_file, 0, SEEK_CUR);
330 #endif
331
332     
333     /* get exclusive lock to whole file.
334      * lock gets removed when we close the file.
335      */
336     if (LockRRD(rrd_file) != 0) {
337       rrd_set_error("could not lock RRD");
338       rrd_free(&rrd);
339       fclose(rrd_file);
340       return(-1);   
341     } 
342
343     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
344         rrd_set_error("allocating updvals pointer array");
345         rrd_free(&rrd);
346         fclose(rrd_file);
347         return(-1);
348     }
349
350     if ((pdp_temp = malloc(sizeof(rrd_value_t)
351                            *rrd.stat_head->ds_cnt))==NULL){
352         rrd_set_error("allocating pdp_temp ...");
353         free(updvals);
354         rrd_free(&rrd);
355         fclose(rrd_file);
356         return(-1);
357     }
358
359     if ((tmpl_idx = malloc(sizeof(unsigned long)
360                            *(rrd.stat_head->ds_cnt+1)))==NULL){
361         rrd_set_error("allocating tmpl_idx ...");
362         free(pdp_temp);
363         free(updvals);
364         rrd_free(&rrd);
365         fclose(rrd_file);
366         return(-1);
367     }
368     /* initialize template redirector */
369     /* default config example (assume DS 1 is a CDEF DS)
370        tmpl_idx[0] -> 0; (time)
371        tmpl_idx[1] -> 1; (DS 0)
372        tmpl_idx[2] -> 3; (DS 2)
373        tmpl_idx[3] -> 4; (DS 3) */
374     tmpl_idx[0] = 0; /* time */
375     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
376         {
377            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
378               tmpl_idx[ii++]=i;
379         }
380     tmpl_cnt= ii;
381
382     if (template) {
383         char *dsname;
384         unsigned int tmpl_len;
385         dsname = template;
386         tmpl_cnt = 1; /* the first entry is the time */
387         tmpl_len = strlen(template);
388         for(i=0;i<=tmpl_len ;i++) {
389             if (template[i] == ':' || template[i] == '\0') {
390                 template[i] = '\0';
391                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
392                     rrd_set_error("Template contains more DS definitions than RRD");
393                     free(updvals); free(pdp_temp);
394                     free(tmpl_idx); rrd_free(&rrd);
395                     fclose(rrd_file); return(-1);
396                 }
397                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
398                     rrd_set_error("unknown DS name '%s'",dsname);
399                     free(updvals); free(pdp_temp);
400                     free(tmpl_idx); rrd_free(&rrd);
401                     fclose(rrd_file); return(-1);
402                 } else {
403                   /* the first element is always the time */
404                   tmpl_idx[tmpl_cnt-1]++; 
405                   /* go to the next entry on the template */
406                   dsname = &template[i+1];
407                   /* fix the damage we did before */
408                   if (i<tmpl_len) {
409                      template[i]=':';
410                   } 
411
412                 }
413             }       
414         }
415     }
416     if ((pdp_new = malloc(sizeof(rrd_value_t)
417                           *rrd.stat_head->ds_cnt))==NULL){
418         rrd_set_error("allocating pdp_new ...");
419         free(updvals);
420         free(pdp_temp);
421         free(tmpl_idx);
422         rrd_free(&rrd);
423         fclose(rrd_file);
424         return(-1);
425     }
426
427 #ifdef HAVE_MMAP
428     rrd_mmaped_file = mmap(0, 
429                         rrd_filesize, 
430                         PROT_READ | PROT_WRITE, 
431                         MAP_SHARED, 
432                         fileno(rrd_file), 
433                         0);
434     if (rrd_mmaped_file == MAP_FAILED) {
435         rrd_set_error("error mmapping file %s", filename);
436         free(updvals);
437         free(pdp_temp);
438         free(tmpl_idx);
439         rrd_free(&rrd);
440         fclose(rrd_file);
441         return(-1);
442     }
443 #endif
444     /* loop through the arguments. */
445     for(arg_i=0; arg_i<argc;arg_i++) {
446         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
447         char *step_start = stepper;
448         char *p;
449         char *parsetime_error = NULL;
450         enum {atstyle, normal} timesyntax;
451         struct rrd_time_value ds_tv;
452         if (stepper == NULL){
453                 rrd_set_error("failed duplication argv entry");
454                 free(updvals);
455                 free(pdp_temp);  
456                 free(tmpl_idx);
457                 rrd_free(&rrd);
458 #ifdef HAVE_MMAP
459                 munmap(rrd_mmaped_file, rrd_filesize);
460 #endif
461                 fclose(rrd_file);
462                 return(-1);
463          }
464         /* initialize all ds input to unknown except the first one
465            which has always got to be set */
466         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
467         strcpy(stepper,argv[arg_i]);
468         updvals[0]=stepper;
469         /* separate all ds elements; first must be examined separately
470            due to alternate time syntax */
471         if ((p=strchr(stepper,'@'))!=NULL) {
472             timesyntax = atstyle;
473             *p = '\0';
474             stepper = p+1;
475         } else if ((p=strchr(stepper,':'))!=NULL) {
476             timesyntax = normal;
477             *p = '\0';
478             stepper = p+1;
479         } else {
480             rrd_set_error("expected timestamp not found in data source from %s:...",
481                           argv[arg_i]);
482             free(step_start);
483             break;
484         }
485         ii=1;
486         updvals[tmpl_idx[ii]] = stepper;
487         while (*stepper) {
488             if (*stepper == ':') {
489                 *stepper = '\0';
490                 ii++;
491                 if (ii<tmpl_cnt){                   
492                     updvals[tmpl_idx[ii]] = stepper+1;
493                 }
494             }
495             stepper++;
496         }
497
498         if (ii != tmpl_cnt-1) {
499             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
500                           tmpl_cnt-1, ii, argv[arg_i]);
501             free(step_start);
502             break;
503         }
504         
505         /* get the time from the reading ... handle N */
506         if (timesyntax == atstyle) {
507             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
508                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
509                 free(step_start);
510                 break;
511             }
512             if (ds_tv.type == RELATIVE_TO_END_TIME ||
513                 ds_tv.type == RELATIVE_TO_START_TIME) {
514                 rrd_set_error("specifying time relative to the 'start' "
515                               "or 'end' makes no sense here: %s",
516                               updvals[0]);
517                 free(step_start);
518                 break;
519             }
520
521             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
522             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
523             
524         } else if (strcmp(updvals[0],"N")==0){
525             gettimeofday(&tmp_time, 0);
526             normalize_time(&tmp_time);
527             current_time = tmp_time.tv_sec;
528             current_time_usec = tmp_time.tv_usec;
529         } else {
530             double tmp;
531             tmp = strtod(updvals[0], 0);
532             current_time = floor(tmp);
533             current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
534         }
535         /* dont do any correction for old version RRDs */
536         if(version < 3) 
537             current_time_usec = 0;
538         
539         if(current_time < rrd.live_head->last_up || 
540           (current_time == rrd.live_head->last_up && 
541            (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
542             rrd_set_error("illegal attempt to update using time %ld when "
543                           "last update time is %ld (minimum one second step)",
544                           current_time, rrd.live_head->last_up);
545             free(step_start);
546             break;
547         }
548         
549         
550         /* seek to the beginning of the rra's */
551         if (rra_current != rra_begin) {
552 #ifndef HAVE_MMAP
553             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
554                 rrd_set_error("seek error in rrd");
555                 free(step_start);
556                 break;
557             }
558 #endif
559             rra_current = rra_begin;
560         }
561         rra_start = rra_begin;
562
563         /* when was the current pdp started */
564         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
565         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
566
567         /* when did the last pdp_st occur */
568         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
569         occu_pdp_st = current_time - occu_pdp_age;
570
571         /* interval = current_time - rrd.live_head->last_up; */
572         interval    = (double)(current_time - rrd.live_head->last_up) 
573                     + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
574
575         if (occu_pdp_st > proc_pdp_st){
576             /* OK we passed the pdp_st moment*/
577             pre_int =  (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
578                                                               * occurred before the latest
579                                                               * pdp_st moment*/
580             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
581             post_int = occu_pdp_age;                         /* how much after it */
582             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
583         } else {
584             pre_int = interval;
585             post_int = 0;
586         }
587
588 #ifdef DEBUG
589         printf(
590                "proc_pdp_age %lu\t"
591                "proc_pdp_st %lu\t" 
592                "occu_pfp_age %lu\t" 
593                "occu_pdp_st %lu\t"
594                "int %lf\t"
595                "pre_int %lf\t"
596                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
597                 occu_pdp_age, occu_pdp_st,
598                interval, pre_int, post_int);
599 #endif
600     
601         /* process the data sources and update the pdp_prep 
602          * area accordingly */
603         for(i=0;i<rrd.stat_head->ds_cnt;i++){
604             enum dst_en dst_idx;
605             dst_idx= dst_conv(rrd.ds_def[i].dst);
606
607             /* make sure we do not build diffs with old last_ds values */
608             if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval 
609                 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
610                 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
611             }
612
613             /* NOTE: DST_CDEF should never enter this if block, because
614              * updvals[i+1][0] is initialized to 'U'; unless the caller
615              * accidently specified a value for the DST_CDEF. To handle 
616               * this case, an extra check is required. */
617
618             if((updvals[i+1][0] != 'U') &&
619                    (dst_idx != DST_CDEF) &&
620                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
621                double rate = DNAN;
622                /* the data source type defines how to process the data */
623                 /* pdp_new contains rate * time ... eg the bytes
624                  * transferred during the interval. Doing it this way saves
625                  * a lot of math operations */
626                 
627
628                 switch(dst_idx){
629                 case DST_COUNTER:
630                 case DST_DERIVE:
631                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
632                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
633                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
634                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
635                                  break;
636                             }
637                        }
638                        if (rrd_test_error()){
639                             break;
640                        }
641                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
642                        if(dst_idx == DST_COUNTER) {
643                           /* simple overflow catcher suggested by Andres Kroonmaa */
644                           /* this will fail terribly for non 32 or 64 bit counters ... */
645                           /* are there any others in SNMP land ? */
646                           if (pdp_new[i] < (double)0.0 ) 
647                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
648                           if (pdp_new[i] < (double)0.0 ) 
649                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
650                        }
651                        rate = pdp_new[i] / interval;
652                     }
653                    else {
654                      pdp_new[i]= DNAN;          
655                    }
656                    break;
657                 case DST_ABSOLUTE:
658                     errno = 0;
659                     pdp_new[i] = strtod(updvals[i+1],&endptr);
660                     if (errno > 0){
661                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
662                         break;
663                     };
664                     if (endptr[0] != '\0'){
665                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
666                         break;
667                     }
668                     rate = pdp_new[i] / interval;                 
669                     break;
670                 case DST_GAUGE:
671                     errno = 0;
672                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
673                     if (errno > 0){
674                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
675                         break;
676                     };
677                     if (endptr[0] != '\0'){
678                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
679                         break;
680                     }
681                     rate = pdp_new[i] / interval;                  
682                     break;
683                 default:
684                     rrd_set_error("rrd contains unknown DS type : '%s'",
685                                   rrd.ds_def[i].dst);
686                     break;
687                 }
688                 /* break out of this for loop if the error string is set */
689                 if (rrd_test_error()){
690                     break;
691                 }
692                /* make sure pdp_temp is neither too large or too small
693                 * if any of these occur it becomes unknown ...
694                 * sorry folks ... */
695                if ( ! isnan(rate) && 
696                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
697                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
698                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
699                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
700                   pdp_new[i] = DNAN;
701                }               
702             } else {
703                 /* no news is news all the same */
704                 pdp_new[i] = DNAN;
705             }
706             
707             /* make a copy of the command line argument for the next run */
708 #ifdef DEBUG
709             fprintf(stderr,
710                     "prep ds[%lu]\t"
711                     "last_arg '%s'\t"
712                     "this_arg '%s'\t"
713                     "pdp_new %10.2f\n",
714                     i,
715                     rrd.pdp_prep[i].last_ds,
716                     updvals[i+1], pdp_new[i]);
717 #endif
718             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
719                 strncpy(rrd.pdp_prep[i].last_ds,
720                         updvals[i+1],LAST_DS_LEN-1);
721                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
722             }
723         }
724         /* break out of the argument parsing loop if the error_string is set */
725         if (rrd_test_error()){
726             free(step_start);
727             break;
728         }
729         /* has a pdp_st moment occurred since the last run ? */
730
731         if (proc_pdp_st == occu_pdp_st){
732             /* no we have not passed a pdp_st moment. therefore update is simple */
733
734             for(i=0;i<rrd.stat_head->ds_cnt;i++){
735                 if(isnan(pdp_new[i]))
736                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval+0.5);
737                 else {
738                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
739                         rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
740                      } else {
741                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
742                      }
743                 }
744 #ifdef DEBUG
745                 fprintf(stderr,
746                         "NO PDP  ds[%lu]\t"
747                         "value %10.2f\t"
748                         "unkn_sec %5lu\n",
749                         i,
750                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
751                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
752 #endif
753             }   
754         } else {
755             /* an pdp_st has occurred. */
756
757             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
758              * occurred up to the last run.        
759             pdp_new[] contains rate*seconds from the latest run.
760             pdp_temp[] will contain the rate for cdp */
761
762             for(i=0;i<rrd.stat_head->ds_cnt;i++){
763                 /* update pdp_prep to the current pdp_st. */
764                 
765                 if(isnan(pdp_new[i]))
766                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(pre_int+0.5);
767                 else {
768                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
769                         rrd.pdp_prep[i].scratch[PDP_val].u_val=         pdp_new[i]/interval*pre_int;
770                      } else {
771                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
772                      }
773                  }
774                 
775
776                 /* if too much of the pdp_prep is unknown we dump it */
777                 if ( 
778                     /* removed because this does not agree with the definition
779                        a heart beat can be unknown */
780                     /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
781                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
782                     (occu_pdp_st-proc_pdp_st <= 
783                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
784                     pdp_temp[i] = DNAN;
785                 } else {
786                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
787                         / (double)( occu_pdp_st
788                                     - proc_pdp_st
789                                     - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
790                 }
791
792                 /* process CDEF data sources; remember each CDEF DS can
793                  * only reference other DS with a lower index number */
794             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
795                    rpnp_t *rpnp;
796                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
797                    /* substitue data values for OP_VARIABLE nodes */
798                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
799                    {
800                           if (rpnp[ii].op == OP_VARIABLE) {
801                                  rpnp[ii].op = OP_NUMBER;
802                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
803                           }
804                    }
805                    /* run the rpn calculator */
806                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
807                           free(rpnp);
808                           break; /* exits the data sources pdp_temp loop */
809                    }
810                 }
811         
812                 /* make pdp_prep ready for the next run */
813                 if(isnan(pdp_new[i])){
814                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int + 0.5);
815                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
816                 } else {
817                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
818                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
819                         pdp_new[i]/interval*post_int;
820                 }
821
822 #ifdef DEBUG
823                 fprintf(stderr,
824                         "PDP UPD ds[%lu]\t"
825                         "pdp_temp %10.2f\t"
826                         "new_prep %10.2f\t"
827                         "new_unkn_sec %5lu\n",
828                         i, pdp_temp[i],
829                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
830                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
831 #endif
832             }
833
834                 /* if there were errors during the last loop, bail out here */
835             if (rrd_test_error()){
836                free(step_start);
837                break;
838             }
839
840                 /* compute the number of elapsed pdp_st moments */
841                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
842 #ifdef DEBUG
843                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
844 #endif
845                 if (rra_step_cnt == NULL)
846                 {
847                    rra_step_cnt = (unsigned long *) 
848                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
849                 }
850
851             for(i = 0, rra_start = rra_begin;
852                 i < rrd.stat_head->rra_cnt;
853             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
854                 i++)
855                 {
856                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
857                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
858                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
859         if (start_pdp_offset <= elapsed_pdp_st) {
860            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
861                       rrd.rra_def[i].pdp_cnt + 1;
862             } else {
863                    rra_step_cnt[i] = 0;
864                 }
865
866                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
867                 {
868                    /* If this is a bulk update, we need to skip ahead in the seasonal
869                         * arrays so that they will be correct for the next observed value;
870                         * note that for the bulk update itself, no update will occur to
871                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
872                         * be set to DNAN. */
873            if (rra_step_cnt[i] > 2) 
874                    {
875                           /* skip update by resetting rra_step_cnt[i],
876                            * note that this is not data source specific; this is due
877                            * to the bulk update, not a DNAN value for the specific data
878                            * source. */
879                           rra_step_cnt[i] = 0;
880               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
881                              &last_seasonal_coef);
882                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
883                              &seasonal_coef);
884                    }
885                 
886                   /* periodically run a smoother for seasonal effects */
887                   /* Need to use first cdp parameter buffer to track
888                    * burnin (burnin requires a specific smoothing schedule).
889                    * The CDP_init_seasonal parameter is really an RRA level,
890                    * not a data source within RRA level parameter, but the rra_def
891                    * is read only for rrd_update (not flushed to disk). */
892                   iii = i*(rrd.stat_head -> ds_cnt);
893                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
894                           <= BURNIN_CYCLES)
895                   {
896                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
897                                  > rrd.rra_def[i].row_cnt - 1) {
898                            /* mark off one of the burnin cycles */
899                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
900                        schedule_smooth = 1;
901                          }  
902                   } else {
903                          /* someone has no doubt invented a trick to deal with this
904                           * wrap around, but at least this code is clear. */
905                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
906                              rrd.rra_ptr[i].cur_row)
907                          {
908                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
909                                   * mapping between PDP and CDP */
910                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
911                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
912                                  {
913 #ifdef DEBUG
914                                         fprintf(stderr,
915                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
916                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
917                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
918 #endif
919                                         schedule_smooth = 1;
920                                  }
921              } else {
922                                  /* can't rely on negative numbers because we are working with
923                                   * unsigned values */
924                                  /* Don't need modulus here. If we've wrapped more than once, only
925                                   * one smooth is executed at the end. */
926                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
927                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
928                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
929                                  {
930 #ifdef DEBUG
931                                         fprintf(stderr,
932                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
933                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
934                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
935 #endif
936                                         schedule_smooth = 1;
937                                  }
938                          }
939                   }
940
941               rra_current = ftell(rrd_file); 
942                 } /* if cf is DEVSEASONAL or SEASONAL */
943
944         if (rrd_test_error()) break;
945
946                     /* update CDP_PREP areas */
947                     /* loop over data soures within each RRA */
948                     for(ii = 0;
949                         ii < rrd.stat_head->ds_cnt;
950                         ii++)
951                         {
952                         
953                         /* iii indexes the CDP prep area for this data source within the RRA */
954                         iii=i*rrd.stat_head->ds_cnt+ii;
955
956                         if (rrd.rra_def[i].pdp_cnt > 1) {
957                           
958                            if (rra_step_cnt[i] > 0) {
959                            /* If we are in this block, as least 1 CDP value will be written to
960                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
961                                 * to be written, then the "fill in" value is the CDP_secondary_val
962                                 * entry. */
963                                   if (isnan(pdp_temp[ii]))
964                   {
965                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
966                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
967                                   } else {
968                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
969                                           * CDP data entries. No matter the CF, the value is the same because
970                                           * the average, max, min, and last of a list of identical values is
971                                           * the same, namely, the value itself. */
972                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
973                                   }
974                      
975                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
976                                       > rrd.rra_def[i].pdp_cnt*
977                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
978                                   {
979                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
980                                          /* initialize carry over */
981                                          if (current_cf == CF_AVERAGE) {
982                                                    if (isnan(pdp_temp[ii])) { 
983                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
984                                                    } else {
985                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
986                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
987                                                    }
988                                          } else {
989                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
990                                          }
991                                   } else {
992                                          rrd_value_t cum_val, cur_val; 
993                                      switch (current_cf) {
994                                                 case CF_AVERAGE:
995                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
996                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
997                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
998                                                (cum_val + cur_val * start_pdp_offset) /
999                                            (rrd.rra_def[i].pdp_cnt
1000                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1001                                                    /* initialize carry over value */
1002                                                    if (isnan(pdp_temp[ii])) { 
1003                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1004                                                    } else {
1005                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1006                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1007                                                    }
1008                                                    break;
1009                                                 case CF_MAXIMUM:
1010                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1011                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1012 #ifdef DEBUG
1013                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1014                                                           isnan(pdp_temp[ii])) {
1015                                                      fprintf(stderr,
1016                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1017                                                                 i,ii);
1018                                                          exit(-1);
1019                                                   }
1020 #endif
1021                                                   if (cur_val > cum_val)
1022                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1023                                                   else
1024                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1025                                                   /* initialize carry over value */
1026                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1027                                                   break;
1028                                                 case CF_MINIMUM:
1029                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1030                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1031 #ifdef DEBUG
1032                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1033                                                           isnan(pdp_temp[ii])) {
1034                                                      fprintf(stderr,
1035                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1036                                                                 i,ii);
1037                                                          exit(-1);
1038                                                   }
1039 #endif
1040                                                   if (cur_val < cum_val)
1041                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1042                                                   else
1043                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1044                                                   /* initialize carry over value */
1045                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1046                                                   break;
1047                                                 case CF_LAST:
1048                                                 default:
1049                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1050                                                    /* initialize carry over value */
1051                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1052                                                 break;
1053                                          }
1054                                   } /* endif meets xff value requirement for a valid value */
1055                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1056                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1057                                   if (isnan(pdp_temp[ii]))
1058                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1059                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1060                                   else
1061                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1062                } else  /* rra_step_cnt[i]  == 0 */
1063                            {
1064 #ifdef DEBUG
1065                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1066                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1067                                          i,ii);
1068                                   } else {
1069                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1070                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1071                                   }
1072 #endif
1073                                   if (isnan(pdp_temp[ii])) {
1074                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1075                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1076                                   {
1077                                          if (current_cf == CF_AVERAGE) {
1078                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1079                                                    elapsed_pdp_st;
1080                                          } else {
1081                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1082                                          }
1083 #ifdef DEBUG
1084                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1085                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1086 #endif
1087                                   } else {
1088                                          switch (current_cf) {
1089                                          case CF_AVERAGE:
1090                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1091                                                    elapsed_pdp_st;
1092                                                 break;
1093                                          case CF_MINIMUM:
1094                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1095                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1096                                                 break; 
1097                                          case CF_MAXIMUM:
1098                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1099                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1100                                                 break; 
1101                                          case CF_LAST:
1102                                          default:
1103                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1104                                                 break;
1105                                          }
1106                                   }
1107                            }
1108                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1109                            if (elapsed_pdp_st > 2)
1110                            {
1111                                    switch (current_cf) {
1112                                    case CF_AVERAGE:
1113                                    default:
1114                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1115                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1116                                           break;
1117                    case CF_SEASONAL:
1118                                    case CF_DEVSEASONAL:
1119                                           /* need to update cached seasonal values, so they are consistent
1120                                            * with the bulk update */
1121                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1122                                            * CDP_last_deviation are the same. */
1123                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1124                                                  last_seasonal_coef[ii];
1125                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1126                                                  seasonal_coef[ii];
1127                                           break;
1128                    case CF_HWPREDICT:
1129                                           /* need to update the null_count and last_null_count.
1130                                            * even do this for non-DNAN pdp_temp because the
1131                                            * algorithm is not learning from batch updates. */
1132                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1133                                                  elapsed_pdp_st;
1134                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1135                                                  elapsed_pdp_st - 1;
1136                                           /* fall through */
1137                                    case CF_DEVPREDICT:
1138                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1139                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1140                                           break;
1141                    case CF_FAILURES:
1142                                           /* do not count missed bulk values as failures */
1143                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1144                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1145                                           /* need to reset violations buffer.
1146                                            * could do this more carefully, but for now, just
1147                                            * assume a bulk update wipes away all violations. */
1148                       erase_violations(&rrd, iii, i);
1149                                           break;
1150                                    }
1151                            } 
1152                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1153
1154                         if (rrd_test_error()) break;
1155
1156                         } /* endif data sources loop */
1157         } /* end RRA Loop */
1158
1159                 /* this loop is only entered if elapsed_pdp_st < 3 */
1160                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1161                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1162                 {
1163                for(i = 0, rra_start = rra_begin;
1164                    i < rrd.stat_head->rra_cnt;
1165                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1166                    i++)
1167                    {
1168                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1169
1170                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1171                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1172                           {
1173                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1174                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1175                                 &seasonal_coef);
1176                  rra_current = ftell(rrd_file);
1177                           }
1178                           if (rrd_test_error()) break;
1179                       /* loop over data soures within each RRA */
1180                       for(ii = 0;
1181                           ii < rrd.stat_head->ds_cnt;
1182                           ii++)
1183                           {
1184                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1185                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1186                                     scratch_idx, seasonal_coef);
1187                           }
1188            } /* end RRA Loop */
1189                    if (rrd_test_error()) break;
1190             } /* end elapsed_pdp_st loop */
1191
1192                 if (rrd_test_error()) break;
1193
1194                 /* Ready to write to disk */
1195                 /* Move sequentially through the file, writing one RRA at a time.
1196                  * Note this architecture divorces the computation of CDP with
1197                  * flushing updated RRA entries to disk. */
1198             for(i = 0, rra_start = rra_begin;
1199                 i < rrd.stat_head->rra_cnt;
1200             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1201                 i++) {
1202                 /* is there anything to write for this RRA? If not, continue. */
1203         if (rra_step_cnt[i] == 0) continue;
1204
1205                 /* write the first row */
1206 #ifdef DEBUG
1207         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1208 #endif
1209             rrd.rra_ptr[i].cur_row++;
1210             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1211                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1212                 /* positition on the first row */
1213                 rra_pos_tmp = rra_start +
1214                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1215                 if(rra_pos_tmp != rra_current) {
1216 #ifndef HAVE_MMAP
1217                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1218                       rrd_set_error("seek error in rrd");
1219                       break;
1220                    }
1221 #endif
1222                    rra_current = rra_pos_tmp;
1223                 }
1224
1225 #ifdef DEBUG
1226             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1227 #endif
1228                 scratch_idx = CDP_primary_val;
1229                 if (pcdp_summary != NULL)
1230                 {
1231                    rra_time = (current_time - current_time 
1232                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1233                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1234                 }
1235 #ifdef HAVE_MMAP
1236                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1237                    pcdp_summary, &rra_time, rrd_mmaped_file);
1238 #else
1239                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1240                    pcdp_summary, &rra_time);
1241 #endif
1242                 if (rrd_test_error()) break;
1243
1244                 /* write other rows of the bulk update, if any */
1245                 scratch_idx = CDP_secondary_val;
1246                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1247                 {
1248                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1249                    {
1250 #ifdef DEBUG
1251               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1252                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1253 #endif
1254                           /* wrap */
1255                           rrd.rra_ptr[i].cur_row = 0;
1256                           /* seek back to beginning of current rra */
1257                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1258                           {
1259                          rrd_set_error("seek error in rrd");
1260                          break;
1261                           }
1262 #ifdef DEBUG
1263                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1264 #endif
1265                           rra_current = rra_start;
1266                    }
1267                    if (pcdp_summary != NULL)
1268                    {
1269                       rra_time = (current_time - current_time 
1270                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1271                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1272                    }
1273 #ifdef HAVE_MMAP
1274                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1275                       pcdp_summary, &rra_time, rrd_mmaped_file);
1276 #else
1277                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1278                       pcdp_summary, &rra_time);
1279 #endif
1280                 }
1281                 
1282                 if (rrd_test_error())
1283                   break;
1284                 } /* RRA LOOP */
1285
1286             /* break out of the argument parsing loop if error_string is set */
1287             if (rrd_test_error()){
1288                    free(step_start);
1289                    break;
1290             } 
1291             
1292         } /* endif a pdp_st has occurred */ 
1293         rrd.live_head->last_up = current_time;
1294         rrd.live_head->last_up_usec = current_time_usec; 
1295         free(step_start);
1296     } /* function argument loop */
1297
1298     if (seasonal_coef != NULL) free(seasonal_coef);
1299     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1300         if (rra_step_cnt != NULL) free(rra_step_cnt);
1301     rpnstack_free(&rpnstack);
1302
1303 #ifdef HAVE_MMAP
1304     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1305             rrd_set_error("error writing(unmapping) file: %s", filename);
1306     }
1307 #endif    
1308     /* if we got here and if there is an error and if the file has not been
1309      * written to, then close things up and return. */
1310     if (rrd_test_error()) {
1311         free(updvals);
1312         free(tmpl_idx);
1313         rrd_free(&rrd);
1314         free(pdp_temp);
1315         free(pdp_new);
1316         fclose(rrd_file);
1317         return(-1);
1318     }
1319
1320     /* aargh ... that was tough ... so many loops ... anyway, its done.
1321      * we just need to write back the live header portion now*/
1322
1323     if (fseek(rrd_file, (sizeof(stat_head_t)
1324                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1325                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1326               SEEK_SET) != 0) {
1327         rrd_set_error("seek rrd for live header writeback");
1328         free(updvals);
1329         free(tmpl_idx);
1330         rrd_free(&rrd);
1331         free(pdp_temp);
1332         free(pdp_new);
1333         fclose(rrd_file);
1334         return(-1);
1335     }
1336
1337     if(version >= 3) {
1338             if(fwrite( rrd.live_head,
1339                        sizeof(live_head_t), 1, rrd_file) != 1){
1340                 rrd_set_error("fwrite live_head to rrd");
1341                 free(updvals);
1342                 rrd_free(&rrd);
1343                 free(tmpl_idx);
1344                 free(pdp_temp);
1345                 free(pdp_new);
1346                 fclose(rrd_file);
1347                 return(-1);
1348             }
1349     }
1350     else {
1351             if(fwrite( &rrd.live_head->last_up,
1352                        sizeof(time_t), 1, rrd_file) != 1){
1353                 rrd_set_error("fwrite live_head to rrd");
1354                 free(updvals);
1355                 rrd_free(&rrd);
1356                 free(tmpl_idx);
1357                 free(pdp_temp);
1358                 free(pdp_new);
1359                 fclose(rrd_file);
1360                 return(-1);
1361             }
1362     }
1363             
1364
1365     if(fwrite( rrd.pdp_prep,
1366                sizeof(pdp_prep_t),
1367                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1368         rrd_set_error("ftwrite pdp_prep to rrd");
1369         free(updvals);
1370         rrd_free(&rrd);
1371         free(tmpl_idx);
1372         free(pdp_temp);
1373         free(pdp_new);
1374         fclose(rrd_file);
1375         return(-1);
1376     }
1377
1378     if(fwrite( rrd.cdp_prep,
1379                sizeof(cdp_prep_t),
1380                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1381        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1382
1383         rrd_set_error("ftwrite cdp_prep to rrd");
1384         free(updvals);
1385         free(tmpl_idx);
1386         rrd_free(&rrd);
1387         free(pdp_temp);
1388         free(pdp_new);
1389         fclose(rrd_file);
1390         return(-1);
1391     }
1392
1393     if(fwrite( rrd.rra_ptr,
1394                sizeof(rra_ptr_t), 
1395                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1396         rrd_set_error("fwrite rra_ptr to rrd");
1397         free(updvals);
1398         free(tmpl_idx);
1399         rrd_free(&rrd);
1400         free(pdp_temp);
1401         free(pdp_new);
1402         fclose(rrd_file);
1403         return(-1);
1404     }
1405
1406     /* OK now close the files and free the memory */
1407     if(fclose(rrd_file) != 0){
1408         rrd_set_error("closing rrd");
1409         free(updvals);
1410         free(tmpl_idx);
1411         rrd_free(&rrd);
1412         free(pdp_temp);
1413         free(pdp_new);
1414         return(-1);
1415     }
1416
1417     /* calling the smoothing code here guarantees at most
1418          * one smoothing operation per rrd_update call. Unfortunately,
1419          * it is possible with bulk updates, or a long-delayed update
1420          * for smoothing to occur off-schedule. This really isn't
1421          * critical except during the burning cycles. */
1422         if (schedule_smooth)
1423         {
1424           rrd_file = fopen(filename,"rb+");
1425           rra_start = rra_begin;
1426           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1427           {
1428             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1429                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1430             {
1431 #ifdef DEBUG
1432               fprintf(stderr,"Running smoother for rra %ld\n",i);
1433 #endif
1434               apply_smoother(&rrd,i,rra_start,rrd_file);
1435               if (rrd_test_error())
1436                 break;
1437             }
1438             rra_start += rrd.rra_def[i].row_cnt
1439               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1440           }
1441           fclose(rrd_file);
1442         }
1443     rrd_free(&rrd);
1444     free(updvals);
1445     free(tmpl_idx);
1446     free(pdp_new);
1447     free(pdp_temp);
1448     return(0);
1449 }
1450
1451 /*
1452  * get exclusive lock to whole file.
1453  * lock gets removed when we close the file
1454  *
1455  * returns 0 on success
1456  */
1457 int
1458 LockRRD(FILE *rrdfile)
1459 {
1460     int rrd_fd;         /* File descriptor for RRD */
1461     int rcstat;
1462
1463     rrd_fd = fileno(rrdfile);
1464
1465         {
1466 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1467     struct _stat st;
1468
1469     if ( _fstat( rrd_fd, &st ) == 0 ) {
1470             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1471     } else {
1472             rcstat = -1;
1473     }
1474 #else
1475     struct flock        lock;
1476     lock.l_type = F_WRLCK;    /* exclusive write lock */
1477     lock.l_len = 0;           /* whole file */
1478     lock.l_start = 0;         /* start of file */
1479     lock.l_whence = SEEK_SET;   /* end of file */
1480
1481     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1482 #endif
1483         }
1484
1485     return(rcstat);
1486 }
1487
1488
1489 #ifdef HAVE_MMAP
1490 info_t
1491 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1492                unsigned short CDP_scratch_idx, 
1493 #ifndef DEBUG
1494 FILE UNUSED(*rrd_file),
1495 #else
1496 FILE *rrd_file,
1497 #endif
1498                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1499 #else
1500 info_t
1501 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1502                unsigned short CDP_scratch_idx, FILE *rrd_file,
1503                    info_t *pcdp_summary, time_t *rra_time)
1504 #endif
1505 {
1506    unsigned long ds_idx, cdp_idx;
1507    infoval iv;
1508   
1509    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1510    {
1511       /* compute the cdp index */
1512       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1513 #ifdef DEBUG
1514           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1515              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1516              rrd -> rra_def[rra_idx].cf_nam);
1517 #endif 
1518       if (pcdp_summary != NULL)
1519           {
1520              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1521              /* append info to the return hash */
1522                  pcdp_summary = info_push(pcdp_summary,
1523                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1524                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1525                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1526          RD_I_VAL, iv);
1527           }
1528 #ifdef HAVE_MMAP
1529           memcpy((char *)rrd_mmaped_file + *rra_current,
1530                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1531                           sizeof(rrd_value_t));
1532 #else
1533           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1534                  sizeof(rrd_value_t),1,rrd_file) != 1)
1535           { 
1536              rrd_set_error("writing rrd");
1537              return 0;
1538           }
1539 #endif
1540           *rra_current += sizeof(rrd_value_t);
1541         }
1542         return (pcdp_summary);
1543 }