rrd files should NOT change size ever ... bulk update code wa buggy.
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.1.x  Copyright Tobias Oetiker, 1997 - 2002
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.13  2003/11/11 19:38:03  oetiker
9  * rrd files should NOT change size ever ... bulk update code wa buggy.
10  * -- David M. Grimes <dgrimes@navisite.com>
11  *
12  * Revision 1.12  2003/09/04 13:16:12  oetiker
13  * should not assigne but compare ... grrrrr
14  *
15  * Revision 1.11  2003/09/02 21:58:35  oetiker
16  * be pickier about what we accept in rrd_update. Complain if things do not work out
17  *
18  * Revision 1.10  2003/04/29 19:14:12  jake
19  * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
20  * Also revert accidental addition of -I to aclocal MakeMakefile.
21  *
22  * Revision 1.9  2003/04/25 18:35:08  jake
23  * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
24  *
25  * Revision 1.8  2003/03/31 21:22:12  oetiker
26  * enables RRDtool updates with microsecond or in case of windows millisecond
27  * precision. This is needed to reduce time measurement error when archive step
28  * is small. (<30s) --  Sasha Mikheev <sasha@avalon-net.co.il>
29  *
30  * Revision 1.7  2003/02/13 07:05:27  oetiker
31  * Find attached the patch I promised to send to you. Please note that there
32  * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
33  * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
34  * library is identical to librrd, but it contains support code for per-thread
35  * global variables currently used for error information only. This is similar
36  * to how errno per-thread variables are implemented.  librrd_th must be linked
37  * alongside of libpthred
38  *
39  * There is also a new file "THREADS", holding some documentation.
40  *
41  * -- Peter Stamfest <peter@stamfest.at>
42  *
43  * Revision 1.6  2002/02/01 20:34:49  oetiker
44  * fixed version number and date/time
45  *
46  * Revision 1.5  2001/05/09 05:31:01  oetiker
47  * Bug fix: when update of multiple PDP/CDP RRAs coincided
48  * with interpolation of multiple PDPs an incorrect value was
49  * stored as the CDP. Especially evident for GAUGE data sources.
50  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
51  *
52  * Revision 1.4  2001/03/10 23:54:41  oetiker
53  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
54  * parser and calculator from rrd_graph and puts then in a new file,
55  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
56  * clean-up of aberrant behavior stuff, including a bug fix.
57  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
58  * -- Jake Brutlag <jakeb@corp.webtv.net>
59  *
60  * Revision 1.3  2001/03/04 13:01:55  oetiker
61  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
62  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
63  * This is backwards compatible! But new files using the Aberrant stuff are not readable
64  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
65  * -- Jake Brutlag <jakeb@corp.webtv.net>
66  *
67  * Revision 1.2  2001/03/04 11:14:25  oetiker
68  * added at-style-time@value:value syntax to rrd_update
69  * --  Dave Bodenstab <imdave@mcs.net>
70  *
71  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
72  * checkin
73  *
74  *****************************************************************************/
75
76 #include "rrd_tool.h"
77 #include <sys/types.h>
78 #include <fcntl.h>
79
80 #ifdef WIN32
81  #include <sys/locking.h>
82  #include <sys/stat.h>
83  #include <io.h>
84 #endif
85
86 #include "rrd_hw.h"
87 #include "rrd_rpncalc.h"
88
89 #include "rrd_is_thread_safe.h"
90
91 #ifdef WIN32
92 /*
93  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
94  * replacement.
95  */
96 #include <sys/timeb.h>
97
98 struct timeval {
99         time_t tv_sec; /* seconds */
100         long tv_usec;  /* microseconds */
101 };
102
103 struct __timezone {
104         int  tz_minuteswest; /* minutes W of Greenwich */
105         int  tz_dsttime;     /* type of dst correction */
106 };
107
108 static gettimeofday(struct timeval *t, struct __timezone *tz) {
109         
110         struct timeb current_time;
111
112         _ftime(&current_time);
113         
114         t->tv_sec  = current_time.time;
115         t->tv_usec = current_time.millitm * 1000;
116 }
117
118 #endif
119 /*
120  * normilize time as returned by gettimeofday. usec part must
121  * be always >= 0
122  */
123 static void normalize_time(struct timeval *t)
124 {
125         if(t->tv_usec < 0) {
126                 t->tv_sec--;
127                 t->tv_usec += 1000000L;
128         }
129 }
130
131 /* Local prototypes */
132 int LockRRD(FILE *rrd_file);
133 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
134                                         unsigned long *rra_current,
135                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
136                                         info_t *pcdp_summary, time_t *rra_time);
137 int rrd_update_r(char *filename, char *template, int argc, char **argv);
138 int _rrd_update(char *filename, char *template, int argc, char **argv, 
139                                         info_t*);
140
141 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
142
143
144 #ifdef STANDALONE
145 int 
146 main(int argc, char **argv){
147         rrd_update(argc,argv);
148         if (rrd_test_error()) {
149                 printf("RRDtool 1.1.x  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
150                         "Usage: rrdupdate filename\n"
151                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
152                         "\t\t\ttime|N:value[:value...]\n\n"
153                         "\t\t\tat-time@value[:value...]\n\n"
154                         "\t\t\t[ time:value[:value...] ..]\n\n");
155                                    
156                 printf("ERROR: %s\n",rrd_get_error());
157                 rrd_clear_error();                                                            
158                 return 1;
159         }
160         return 0;
161 }
162 #endif
163
164 info_t *rrd_update_v(int argc, char **argv)
165 {
166     char             *template = NULL;          
167         info_t *result = NULL;
168         infoval rc;
169
170     while (1) {
171                 static struct option long_options[] =
172                         {
173                                 {"template",      required_argument, 0, 't'},
174                                 {0,0,0,0}
175                         };
176                 int option_index = 0;
177                 int opt;
178                 opt = getopt_long(argc, argv, "t:", 
179                                                   long_options, &option_index);
180                 
181                 if (opt == EOF)
182                         break;
183                 
184                 switch(opt) {
185                 case 't':
186                         template = optarg;
187                         break;
188                 
189                 case '?':
190                         rrd_set_error("unknown option '%s'",argv[optind-1]);
191             rc.u_int = -1;
192                         goto end_tag;
193                 }
194     }
195
196     /* need at least 2 arguments: filename, data. */
197     if (argc-optind < 2) {
198                 rrd_set_error("Not enough arguments");
199         rc.u_int = -1;
200                 goto end_tag;
201     }
202     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
203         rc.u_int = _rrd_update(argv[optind], template,
204                       argc - optind - 1, argv + optind + 1, result);
205     result->value.u_int = rc.u_int;
206 end_tag:
207     return result;
208 }
209
210 int
211 rrd_update(int argc, char **argv)
212 {
213     char             *template = NULL;          
214     int rc;
215
216     while (1) {
217                 static struct option long_options[] =
218                         {
219                                 {"template",      required_argument, 0, 't'},
220                                 {0,0,0,0}
221                         };
222                 int option_index = 0;
223                 int opt;
224                 opt = getopt_long(argc, argv, "t:", 
225                                                   long_options, &option_index);
226                 
227                 if (opt == EOF)
228                         break;
229                 
230                 switch(opt) {
231                 case 't':
232                         template = optarg;
233                         break;
234                 
235                 case '?':
236                         rrd_set_error("unknown option '%s'",argv[optind-1]);
237                         return(-1);
238                 }
239     }
240
241     /* need at least 2 arguments: filename, data. */
242     if (argc-optind < 2) {
243                 rrd_set_error("Not enough arguments");
244
245                 return -1;
246     }
247  
248         rc = rrd_update_r(argv[optind], template,
249                       argc - optind - 1, argv + optind + 1);
250     return rc;
251 }
252
253 int
254 rrd_update_r(char *filename, char *template, int argc, char **argv)
255 {
256    return _rrd_update(filename, template, argc, argv, NULL);
257 }
258
259 int
260 _rrd_update(char *filename, char *template, int argc, char **argv, 
261    info_t *pcdp_summary)
262 {
263
264     int              arg_i = 2;
265     short            j;
266     unsigned long    i,ii,iii=1;
267
268     unsigned long    rra_begin;          /* byte pointer to the rra
269                                           * area in the rrd file.  this
270                                           * pointer never changes value */
271     unsigned long    rra_start;          /* byte pointer to the rra
272                                           * area in the rrd file.  this
273                                           * pointer changes as each rrd is
274                                           * processed. */
275     unsigned long    rra_current;        /* byte pointer to the current write
276                                           * spot in the rrd file. */
277     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
278     double           interval,
279         pre_int,post_int;                /* interval between this and
280                                           * the last run */
281     unsigned long    proc_pdp_st;        /* which pdp_st was the last
282                                           * to be processed */
283     unsigned long    occu_pdp_st;        /* when was the pdp_st
284                                           * before the last update
285                                           * time */
286     unsigned long    proc_pdp_age;       /* how old was the data in
287                                           * the pdp prep area when it
288                                           * was last updated */
289     unsigned long    occu_pdp_age;       /* how long ago was the last
290                                           * pdp_step time */
291     rrd_value_t      *pdp_new;           /* prepare the incoming data
292                                           * to be added the the
293                                           * existing entry */
294     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
295                                           * to be added the the
296                                           * cdp values */
297
298     long             *tmpl_idx;          /* index representing the settings
299                                             transported by the template index */
300     unsigned long    tmpl_cnt = 2;       /* time and data */
301
302     FILE             *rrd_file;
303     rrd_t            rrd;
304     time_t           current_time;
305         time_t           rra_time; /* time of update for a RRA */
306     unsigned long    current_time_usec;  /* microseconds part of current time */
307     struct timeval   tmp_time;           /* used for time conversion */
308
309     char             **updvals;
310     int              schedule_smooth = 0;
311         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
312                                          /* a vector of future Holt-Winters seasonal coefs */
313     unsigned long    elapsed_pdp_st;
314                                          /* number of elapsed PDP steps since last update */
315     unsigned long    *rra_step_cnt = NULL;
316                                          /* number of rows to be updated in an RRA for a data
317                                           * value. */
318     unsigned long    start_pdp_offset;
319                                          /* number of PDP steps since the last update that
320                                           * are assigned to the first CDP to be generated
321                                           * since the last update. */
322     unsigned short   scratch_idx;
323                                          /* index into the CDP scratch array */
324     enum cf_en       current_cf;
325                                          /* numeric id of the current consolidation function */
326     rpnstack_t       rpnstack; /* used for COMPUTE DS */
327     int              version;  /* rrd version */
328     char             *endptr; /* used in the conversion */
329
330     rpnstack_init(&rpnstack);
331
332     /* need at least 1 arguments: data. */
333     if (argc < 1) {
334         rrd_set_error("Not enough arguments");
335         return -1;
336     }
337     
338     
339
340     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
341         return -1;
342     }
343     /* initialize time */
344     version = atoi(rrd.stat_head->version);
345     gettimeofday(&tmp_time, 0);
346     normalize_time(&tmp_time);
347     current_time = tmp_time.tv_sec;
348     if(version >= 3) {
349         current_time_usec = tmp_time.tv_usec;
350     }
351     else {
352         current_time_usec = 0;
353     }
354
355     rra_current = rra_start = rra_begin = ftell(rrd_file);
356     /* This is defined in the ANSI C standard, section 7.9.5.3:
357
358         When a file is opened with udpate mode ('+' as the second
359         or third character in the ... list of mode argument
360         variables), both input and ouptut may be performed on the
361         associated stream.  However, ...  input may not be directly
362         followed by output without an intervening call to a file
363         positioning function, unless the input oepration encounters
364         end-of-file. */
365     fseek(rrd_file, 0, SEEK_CUR);
366
367     
368     /* get exclusive lock to whole file.
369      * lock gets removed when we close the file.
370      */
371     if (LockRRD(rrd_file) != 0) {
372       rrd_set_error("could not lock RRD");
373       rrd_free(&rrd);
374       fclose(rrd_file);
375       return(-1);   
376     } 
377
378     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
379         rrd_set_error("allocating updvals pointer array");
380         rrd_free(&rrd);
381         fclose(rrd_file);
382         return(-1);
383     }
384
385     if ((pdp_temp = malloc(sizeof(rrd_value_t)
386                            *rrd.stat_head->ds_cnt))==NULL){
387         rrd_set_error("allocating pdp_temp ...");
388         free(updvals);
389         rrd_free(&rrd);
390         fclose(rrd_file);
391         return(-1);
392     }
393
394     if ((tmpl_idx = malloc(sizeof(unsigned long)
395                            *(rrd.stat_head->ds_cnt+1)))==NULL){
396         rrd_set_error("allocating tmpl_idx ...");
397         free(pdp_temp);
398         free(updvals);
399         rrd_free(&rrd);
400         fclose(rrd_file);
401         return(-1);
402     }
403     /* initialize template redirector */
404     /* default config example (assume DS 1 is a CDEF DS)
405        tmpl_idx[0] -> 0; (time)
406        tmpl_idx[1] -> 1; (DS 0)
407        tmpl_idx[2] -> 3; (DS 2)
408        tmpl_idx[3] -> 4; (DS 3) */
409     tmpl_idx[0] = 0; /* time */
410     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
411         {
412            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
413               tmpl_idx[ii++]=i;
414         }
415     tmpl_cnt= ii;
416
417     if (template) {
418         char *dsname;
419         unsigned int tmpl_len;
420         dsname = template;
421         tmpl_cnt = 1; /* the first entry is the time */
422         tmpl_len = strlen(template);
423         for(i=0;i<=tmpl_len ;i++) {
424             if (template[i] == ':' || template[i] == '\0') {
425                 template[i] = '\0';
426                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
427                     rrd_set_error("Template contains more DS definitions than RRD");
428                     free(updvals); free(pdp_temp);
429                     free(tmpl_idx); rrd_free(&rrd);
430                     fclose(rrd_file); return(-1);
431                 }
432                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
433                     rrd_set_error("unknown DS name '%s'",dsname);
434                     free(updvals); free(pdp_temp);
435                     free(tmpl_idx); rrd_free(&rrd);
436                     fclose(rrd_file); return(-1);
437                 } else {
438                   /* the first element is always the time */
439                   tmpl_idx[tmpl_cnt-1]++; 
440                   /* go to the next entry on the template */
441                   dsname = &template[i+1];
442                   /* fix the damage we did before */
443                   if (i<tmpl_len) {
444                      template[i]=':';
445                   } 
446
447                 }
448             }       
449         }
450     }
451     if ((pdp_new = malloc(sizeof(rrd_value_t)
452                           *rrd.stat_head->ds_cnt))==NULL){
453         rrd_set_error("allocating pdp_new ...");
454         free(updvals);
455         free(pdp_temp);
456         free(tmpl_idx);
457         rrd_free(&rrd);
458         fclose(rrd_file);
459         return(-1);
460     }
461
462     /* loop through the arguments. */
463     for(arg_i=0; arg_i<argc;arg_i++) {
464         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
465         char *step_start = stepper;
466         char *p;
467         char *parsetime_error = NULL;
468         enum {atstyle, normal} timesyntax;
469         struct time_value ds_tv;
470         if (stepper == NULL){
471                 rrd_set_error("failed duplication argv entry");
472                 free(updvals);
473                 free(pdp_temp);  
474                 free(tmpl_idx);
475                 rrd_free(&rrd);
476                 fclose(rrd_file);
477                 return(-1);
478          }
479         /* initialize all ds input to unknown except the first one
480            which has always got to be set */
481         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
482         strcpy(stepper,argv[arg_i]);
483         updvals[0]=stepper;
484         /* separate all ds elements; first must be examined separately
485            due to alternate time syntax */
486         if ((p=strchr(stepper,'@'))!=NULL) {
487             timesyntax = atstyle;
488             *p = '\0';
489             stepper = p+1;
490         } else if ((p=strchr(stepper,':'))!=NULL) {
491             timesyntax = normal;
492             *p = '\0';
493             stepper = p+1;
494         } else {
495             rrd_set_error("expected timestamp not found in data source from %s:...",
496                           argv[arg_i]);
497             free(step_start);
498             break;
499         }
500         ii=1;
501         updvals[tmpl_idx[ii]] = stepper;
502         while (*stepper) {
503             if (*stepper == ':') {
504                 *stepper = '\0';
505                 ii++;
506                 if (ii<tmpl_cnt){                   
507                     updvals[tmpl_idx[ii]] = stepper+1;
508                 }
509             }
510             stepper++;
511         }
512
513         if (ii != tmpl_cnt-1) {
514             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
515                           tmpl_cnt-1, ii, argv[arg_i]);
516             free(step_start);
517             break;
518         }
519         
520         /* get the time from the reading ... handle N */
521         if (timesyntax == atstyle) {
522             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
523                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
524                 free(step_start);
525                 break;
526             }
527             if (ds_tv.type == RELATIVE_TO_END_TIME ||
528                 ds_tv.type == RELATIVE_TO_START_TIME) {
529                 rrd_set_error("specifying time relative to the 'start' "
530                               "or 'end' makes no sense here: %s",
531                               updvals[0]);
532                 free(step_start);
533                 break;
534             }
535
536             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
537             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
538             
539         } else if (strcmp(updvals[0],"N")==0){
540             gettimeofday(&tmp_time, 0);
541             normalize_time(&tmp_time);
542             current_time = tmp_time.tv_sec;
543             current_time_usec = tmp_time.tv_usec;
544         } else {
545             double tmp;
546             tmp = strtod(updvals[0], 0);
547             current_time = floor(tmp);
548             current_time_usec = (long)((tmp - current_time) * 1000000L);
549         }
550         /* dont do any correction for old version RRDs */
551         if(version < 3) 
552             current_time_usec = 0;
553         
554         if(current_time <= rrd.live_head->last_up){
555             rrd_set_error("illegal attempt to update using time %ld when "
556                           "last update time is %ld (minimum one second step)",
557                           current_time, rrd.live_head->last_up);
558             free(step_start);
559             break;
560         }
561         
562         
563         /* seek to the beginning of the rra's */
564         if (rra_current != rra_begin) {
565             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
566                 rrd_set_error("seek error in rrd");
567                 free(step_start);
568                 break;
569             }
570             rra_current = rra_begin;
571         }
572         rra_start = rra_begin;
573
574         /* when was the current pdp started */
575         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
576         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
577
578         /* when did the last pdp_st occur */
579         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
580         occu_pdp_st = current_time - occu_pdp_age;
581         /* interval = current_time - rrd.live_head->last_up; */
582         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
583     
584         if (occu_pdp_st > proc_pdp_st){
585             /* OK we passed the pdp_st moment*/
586             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
587                                                               * occurred before the latest
588                                                               * pdp_st moment*/
589             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
590             post_int = occu_pdp_age;                         /* how much after it */
591             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
592         } else {
593             pre_int = interval;
594             post_int = 0;
595         }
596
597 #ifdef DEBUG
598         printf(
599                "proc_pdp_age %lu\t"
600                "proc_pdp_st %lu\t" 
601                "occu_pfp_age %lu\t" 
602                "occu_pdp_st %lu\t"
603                "int %lf\t"
604                "pre_int %lf\t"
605                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
606                 occu_pdp_age, occu_pdp_st,
607                interval, pre_int, post_int);
608 #endif
609     
610         /* process the data sources and update the pdp_prep 
611          * area accordingly */
612         for(i=0;i<rrd.stat_head->ds_cnt;i++){
613             enum dst_en dst_idx;
614             dst_idx= dst_conv(rrd.ds_def[i].dst);
615                 /* NOTE: DST_CDEF should never enter this if block, because
616                  * updvals[i+1][0] is initialized to 'U'; unless the caller
617                  * accidently specified a value for the DST_CDEF. To handle 
618                  * this case, an extra check is required. */
619             if((updvals[i+1][0] != 'U') &&
620                    (dst_idx != DST_CDEF) &&
621                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
622                double rate = DNAN;
623                /* the data source type defines how to process the data */
624                 /* pdp_new contains rate * time ... eg the bytes
625                  * transferred during the interval. Doing it this way saves
626                  * a lot of math operations */
627                 
628
629                 switch(dst_idx){
630                 case DST_COUNTER:
631                 case DST_DERIVE:
632                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
633                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
634                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
635                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
636                                  break;
637                             }
638                        }
639                        if (rrd_test_error()){
640                             break;
641                        }
642                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
643                        if(dst_idx == DST_COUNTER) {
644                           /* simple overflow catcher sugestet by andres kroonmaa */
645                           /* this will fail terribly for non 32 or 64 bit counters ... */
646                           /* are there any others in SNMP land ? */
647                           if (pdp_new[i] < (double)0.0 ) 
648                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
649                           if (pdp_new[i] < (double)0.0 ) 
650                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
651                        }
652                        rate = pdp_new[i] / interval;
653                     }
654                    else {
655                      pdp_new[i]= DNAN;          
656                    }
657                    break;
658                 case DST_ABSOLUTE:
659                     errno = 0;
660                     pdp_new[i] = strtod(updvals[i+1],&endptr);
661                     if (errno > 0){
662                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
663                         break;
664                     };
665                     if (endptr[0] != '\0'){
666                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
667                         break;
668                     }
669                     rate = pdp_new[i] / interval;                 
670                     break;
671                 case DST_GAUGE:
672                     errno = 0;
673                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
674                     if (errno > 0){
675                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
676                         break;
677                     };
678                     if (endptr[0] != '\0'){
679                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
680                         break;
681                     }
682                     rate = pdp_new[i] / interval;                  
683                     break;
684                 default:
685                     rrd_set_error("rrd contains unknown DS type : '%s'",
686                                   rrd.ds_def[i].dst);
687                     break;
688                 }
689                 /* break out of this for loop if the error string is set */
690                 if (rrd_test_error()){
691                     break;
692                 }
693                /* make sure pdp_temp is neither too large or too small
694                 * if any of these occur it becomes unknown ...
695                 * sorry folks ... */
696                if ( ! isnan(rate) && 
697                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
698                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
699                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
700                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
701                   pdp_new[i] = DNAN;
702                }               
703             } else {
704                 /* no news is news all the same */
705                 pdp_new[i] = DNAN;
706             }
707             
708             /* make a copy of the command line argument for the next run */
709 #ifdef DEBUG
710             fprintf(stderr,
711                     "prep ds[%lu]\t"
712                     "last_arg '%s'\t"
713                     "this_arg '%s'\t"
714                     "pdp_new %10.2f\n",
715                     i,
716                     rrd.pdp_prep[i].last_ds,
717                     updvals[i+1], pdp_new[i]);
718 #endif
719             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
720                 strncpy(rrd.pdp_prep[i].last_ds,
721                         updvals[i+1],LAST_DS_LEN-1);
722                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
723             }
724         }
725         /* break out of the argument parsing loop if the error_string is set */
726         if (rrd_test_error()){
727             free(step_start);
728             break;
729         }
730         /* has a pdp_st moment occurred since the last run ? */
731
732         if (proc_pdp_st == occu_pdp_st){
733             /* no we have not passed a pdp_st moment. therefore update is simple */
734
735             for(i=0;i<rrd.stat_head->ds_cnt;i++){
736                 if(isnan(pdp_new[i]))
737                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
738                 else
739                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
740 #ifdef DEBUG
741                 fprintf(stderr,
742                         "NO PDP  ds[%lu]\t"
743                         "value %10.2f\t"
744                         "unkn_sec %5lu\n",
745                         i,
746                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
747                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
748 #endif
749             }   
750         } else {
751             /* an pdp_st has occurred. */
752
753             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
754              * occurred up to the last run.        
755             pdp_new[] contains rate*seconds from the latest run.
756             pdp_temp[] will contain the rate for cdp */
757
758             for(i=0;i<rrd.stat_head->ds_cnt;i++){
759                 /* update pdp_prep to the current pdp_st */
760                 if(isnan(pdp_new[i]))
761                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
762                 else
763                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
764                         pdp_new[i]/(double)interval*(double)pre_int;
765
766                 /* if too much of the pdp_prep is unknown we dump it */
767                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
768                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
769                     (occu_pdp_st-proc_pdp_st <= 
770                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
771                     pdp_temp[i] = DNAN;
772                 } else {
773                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
774                         / (double)( occu_pdp_st
775                                    - proc_pdp_st
776                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
777                 }
778
779                 /* process CDEF data sources; remember each CDEF DS can
780                  * only reference other DS with a lower index number */
781             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
782                    rpnp_t *rpnp;
783                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
784                    /* substitue data values for OP_VARIABLE nodes */
785                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
786                    {
787                           if (rpnp[ii].op == OP_VARIABLE) {
788                                  rpnp[ii].op = OP_NUMBER;
789                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
790                           }
791                    }
792                    /* run the rpn calculator */
793                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
794                           free(rpnp);
795                           break; /* exits the data sources pdp_temp loop */
796                    }
797                 }
798         
799                 /* make pdp_prep ready for the next run */
800                 if(isnan(pdp_new[i])){
801                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
802                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
803                 } else {
804                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
805                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
806                         pdp_new[i]/(double)interval*(double)post_int;
807                 }
808
809 #ifdef DEBUG
810                 fprintf(stderr,
811                         "PDP UPD ds[%lu]\t"
812                         "pdp_temp %10.2f\t"
813                         "new_prep %10.2f\t"
814                         "new_unkn_sec %5lu\n",
815                         i, pdp_temp[i],
816                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
817                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
818 #endif
819             }
820
821                 /* if there were errors during the last loop, bail out here */
822             if (rrd_test_error()){
823                free(step_start);
824                break;
825             }
826
827                 /* compute the number of elapsed pdp_st moments */
828                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
829 #ifdef DEBUG
830                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
831 #endif
832                 if (rra_step_cnt == NULL)
833                 {
834                    rra_step_cnt = (unsigned long *) 
835                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
836                 }
837
838             for(i = 0, rra_start = rra_begin;
839                 i < rrd.stat_head->rra_cnt;
840             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
841                 i++)
842                 {
843                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
844                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
845                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
846         if (start_pdp_offset <= elapsed_pdp_st) {
847            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
848                       rrd.rra_def[i].pdp_cnt + 1;
849             } else {
850                    rra_step_cnt[i] = 0;
851                 }
852
853                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
854                 {
855                    /* If this is a bulk update, we need to skip ahead in the seasonal
856                         * arrays so that they will be correct for the next observed value;
857                         * note that for the bulk update itself, no update will occur to
858                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
859                         * be set to DNAN. */
860            if (rra_step_cnt[i] > 2) 
861                    {
862                           /* skip update by resetting rra_step_cnt[i],
863                            * note that this is not data source specific; this is due
864                            * to the bulk update, not a DNAN value for the specific data
865                            * source. */
866                           rra_step_cnt[i] = 0;
867               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
868                              &last_seasonal_coef);
869                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
870                              &seasonal_coef);
871                    }
872                 
873                   /* periodically run a smoother for seasonal effects */
874                   /* Need to use first cdp parameter buffer to track
875                    * burnin (burnin requires a specific smoothing schedule).
876                    * The CDP_init_seasonal parameter is really an RRA level,
877                    * not a data source within RRA level parameter, but the rra_def
878                    * is read only for rrd_update (not flushed to disk). */
879                   iii = i*(rrd.stat_head -> ds_cnt);
880                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
881                           <= BURNIN_CYCLES)
882                   {
883                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
884                                  > rrd.rra_def[i].row_cnt - 1) {
885                            /* mark off one of the burnin cycles */
886                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
887                        schedule_smooth = 1;
888                          }  
889                   } else {
890                          /* someone has no doubt invented a trick to deal with this
891                           * wrap around, but at least this code is clear. */
892                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
893                              rrd.rra_ptr[i].cur_row)
894                          {
895                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
896                                   * mapping between PDP and CDP */
897                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
898                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
899                                  {
900 #ifdef DEBUG
901                                         fprintf(stderr,
902                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
903                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
904                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
905 #endif
906                                         schedule_smooth = 1;
907                                  }
908              } else {
909                                  /* can't rely on negative numbers because we are working with
910                                   * unsigned values */
911                                  /* Don't need modulus here. If we've wrapped more than once, only
912                                   * one smooth is executed at the end. */
913                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
914                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
915                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
916                                  {
917 #ifdef DEBUG
918                                         fprintf(stderr,
919                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
920                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
921                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
922 #endif
923                                         schedule_smooth = 1;
924                                  }
925                          }
926                   }
927
928               rra_current = ftell(rrd_file); 
929                 } /* if cf is DEVSEASONAL or SEASONAL */
930
931         if (rrd_test_error()) break;
932
933                     /* update CDP_PREP areas */
934                     /* loop over data soures within each RRA */
935                     for(ii = 0;
936                         ii < rrd.stat_head->ds_cnt;
937                         ii++)
938                         {
939                         
940                         /* iii indexes the CDP prep area for this data source within the RRA */
941                         iii=i*rrd.stat_head->ds_cnt+ii;
942
943                         if (rrd.rra_def[i].pdp_cnt > 1) {
944                           
945                            if (rra_step_cnt[i] > 0) {
946                            /* If we are in this block, as least 1 CDP value will be written to
947                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
948                                 * to be written, then the "fill in" value is the CDP_secondary_val
949                                 * entry. */
950                                   if (isnan(pdp_temp[ii]))
951                   {
952                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
953                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
954                                   } else {
955                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
956                                           * CDP data entries. No matter the CF, the value is the same because
957                                           * the average, max, min, and last of a list of identical values is
958                                           * the same, namely, the value itself. */
959                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
960                                   }
961                      
962                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
963                                       > rrd.rra_def[i].pdp_cnt*
964                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
965                                   {
966                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
967                                          /* initialize carry over */
968                                          if (current_cf == CF_AVERAGE) {
969                                                    if (isnan(pdp_temp[ii])) { 
970                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
971                                                    } else {
972                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
973                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
974                                                    }
975                                          } else {
976                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
977                                          }
978                                   } else {
979                                          rrd_value_t cum_val, cur_val; 
980                                      switch (current_cf) {
981                                                 case CF_AVERAGE:
982                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
983                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
984                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
985                                                (cum_val + cur_val * start_pdp_offset) /
986                                            (rrd.rra_def[i].pdp_cnt
987                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
988                                                    /* initialize carry over value */
989                                                    if (isnan(pdp_temp[ii])) { 
990                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
991                                                    } else {
992                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
993                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
994                                                    }
995                                                    break;
996                                                 case CF_MAXIMUM:
997                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
998                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
999 #ifdef DEBUG
1000                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1001                                                           isnan(pdp_temp[ii])) {
1002                                                      fprintf(stderr,
1003                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1004                                                                 i,ii);
1005                                                          exit(-1);
1006                                                   }
1007 #endif
1008                                                   if (cur_val > cum_val)
1009                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1010                                                   else
1011                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1012                                                   /* initialize carry over value */
1013                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1014                                                   break;
1015                                                 case CF_MINIMUM:
1016                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1017                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1018 #ifdef DEBUG
1019                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1020                                                           isnan(pdp_temp[ii])) {
1021                                                      fprintf(stderr,
1022                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1023                                                                 i,ii);
1024                                                          exit(-1);
1025                                                   }
1026 #endif
1027                                                   if (cur_val < cum_val)
1028                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1029                                                   else
1030                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1031                                                   /* initialize carry over value */
1032                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1033                                                   break;
1034                                                 case CF_LAST:
1035                                                 default:
1036                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1037                                                    /* initialize carry over value */
1038                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1039                                                 break;
1040                                          }
1041                                   } /* endif meets xff value requirement for a valid value */
1042                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1043                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1044                                   if (isnan(pdp_temp[ii]))
1045                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1046                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1047                                   else
1048                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1049                } else  /* rra_step_cnt[i]  == 0 */
1050                            {
1051 #ifdef DEBUG
1052                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1053                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1054                                          i,ii);
1055                                   } else {
1056                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1057                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1058                                   }
1059 #endif
1060                                   if (isnan(pdp_temp[ii])) {
1061                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1062                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1063                                   {
1064                                          if (current_cf == CF_AVERAGE) {
1065                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1066                                                    elapsed_pdp_st;
1067                                          } else {
1068                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1069                                          }
1070 #ifdef DEBUG
1071                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1072                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1073 #endif
1074                                   } else {
1075                                          switch (current_cf) {
1076                                          case CF_AVERAGE:
1077                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1078                                                    elapsed_pdp_st;
1079                                                 break;
1080                                          case CF_MINIMUM:
1081                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1082                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1083                                                 break; 
1084                                          case CF_MAXIMUM:
1085                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1086                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1087                                                 break; 
1088                                          case CF_LAST:
1089                                          default:
1090                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1091                                                 break;
1092                                          }
1093                                   }
1094                            }
1095                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1096                            if (elapsed_pdp_st > 2)
1097                            {
1098                                    switch (current_cf) {
1099                                    case CF_AVERAGE:
1100                                    default:
1101                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1102                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1103                                           break;
1104                    case CF_SEASONAL:
1105                                    case CF_DEVSEASONAL:
1106                                           /* need to update cached seasonal values, so they are consistent
1107                                            * with the bulk update */
1108                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1109                                            * CDP_last_deviation are the same. */
1110                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1111                                                  last_seasonal_coef[ii];
1112                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1113                                                  seasonal_coef[ii];
1114                                           break;
1115                    case CF_HWPREDICT:
1116                                           /* need to update the null_count and last_null_count.
1117                                            * even do this for non-DNAN pdp_temp because the
1118                                            * algorithm is not learning from batch updates. */
1119                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1120                                                  elapsed_pdp_st;
1121                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1122                                                  elapsed_pdp_st - 1;
1123                                           /* fall through */
1124                                    case CF_DEVPREDICT:
1125                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1126                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1127                                           break;
1128                    case CF_FAILURES:
1129                                           /* do not count missed bulk values as failures */
1130                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1131                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1132                                           /* need to reset violations buffer.
1133                                            * could do this more carefully, but for now, just
1134                                            * assume a bulk update wipes away all violations. */
1135                       erase_violations(&rrd, iii, i);
1136                                           break;
1137                                    }
1138                            } 
1139                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1140
1141                         if (rrd_test_error()) break;
1142
1143                         } /* endif data sources loop */
1144         } /* end RRA Loop */
1145
1146                 /* this loop is only entered if elapsed_pdp_st < 3 */
1147                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1148                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1149                 {
1150                for(i = 0, rra_start = rra_begin;
1151                    i < rrd.stat_head->rra_cnt;
1152                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1153                    i++)
1154                    {
1155                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1156
1157                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1158                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1159                           {
1160                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1161                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1162                                 &seasonal_coef);
1163                  rra_current = ftell(rrd_file);
1164                           }
1165                           if (rrd_test_error()) break;
1166                       /* loop over data soures within each RRA */
1167                       for(ii = 0;
1168                           ii < rrd.stat_head->ds_cnt;
1169                           ii++)
1170                           {
1171                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1172                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1173                                     scratch_idx, seasonal_coef);
1174                           }
1175            } /* end RRA Loop */
1176                    if (rrd_test_error()) break;
1177             } /* end elapsed_pdp_st loop */
1178
1179                 if (rrd_test_error()) break;
1180
1181                 /* Ready to write to disk */
1182                 /* Move sequentially through the file, writing one RRA at a time.
1183                  * Note this architecture divorces the computation of CDP with
1184                  * flushing updated RRA entries to disk. */
1185             for(i = 0, rra_start = rra_begin;
1186                 i < rrd.stat_head->rra_cnt;
1187             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1188                 i++) {
1189                 /* is there anything to write for this RRA? If not, continue. */
1190         if (rra_step_cnt[i] == 0) continue;
1191
1192                 /* write the first row */
1193 #ifdef DEBUG
1194         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1195 #endif
1196             rrd.rra_ptr[i].cur_row++;
1197             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1198                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1199                 /* positition on the first row */
1200                 rra_pos_tmp = rra_start +
1201                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1202                 if(rra_pos_tmp != rra_current) {
1203                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1204                       rrd_set_error("seek error in rrd");
1205                       break;
1206                    }
1207                    rra_current = rra_pos_tmp;
1208                 }
1209
1210 #ifdef DEBUG
1211             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1212 #endif
1213                 scratch_idx = CDP_primary_val;
1214                 if (pcdp_summary != NULL)
1215                 {
1216                    rra_time = (current_time - current_time 
1217                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1218                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1219                 }
1220                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1221                    pcdp_summary, &rra_time);
1222                 if (rrd_test_error()) break;
1223
1224                 /* write other rows of the bulk update, if any */
1225                 scratch_idx = CDP_secondary_val;
1226                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1227                 {
1228                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1229                    {
1230 #ifdef DEBUG
1231               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1232                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1233 #endif
1234                           /* wrap */
1235                           rrd.rra_ptr[i].cur_row = 0;
1236                           /* seek back to beginning of current rra */
1237                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1238                           {
1239                          rrd_set_error("seek error in rrd");
1240                          break;
1241                           }
1242 #ifdef DEBUG
1243                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1244 #endif
1245                           rra_current = rra_start;
1246                    }
1247                    if (pcdp_summary != NULL)
1248                    {
1249                       rra_time = (current_time - current_time 
1250                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1251                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1252                    }
1253                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1254                       pcdp_summary, &rra_time);
1255                 }
1256                 
1257                 if (rrd_test_error())
1258                   break;
1259                 } /* RRA LOOP */
1260
1261             /* break out of the argument parsing loop if error_string is set */
1262             if (rrd_test_error()){
1263                    free(step_start);
1264                    break;
1265             } 
1266             
1267         } /* endif a pdp_st has occurred */ 
1268         rrd.live_head->last_up = current_time;
1269         rrd.live_head->last_up_usec = current_time_usec; 
1270         free(step_start);
1271     } /* function argument loop */
1272
1273     if (seasonal_coef != NULL) free(seasonal_coef);
1274     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1275         if (rra_step_cnt != NULL) free(rra_step_cnt);
1276     rpnstack_free(&rpnstack);
1277
1278     /* if we got here and if there is an error and if the file has not been
1279      * written to, then close things up and return. */
1280     if (rrd_test_error()) {
1281         free(updvals);
1282         free(tmpl_idx);
1283         rrd_free(&rrd);
1284         free(pdp_temp);
1285         free(pdp_new);
1286         fclose(rrd_file);
1287         return(-1);
1288     }
1289
1290     /* aargh ... that was tough ... so many loops ... anyway, its done.
1291      * we just need to write back the live header portion now*/
1292
1293     if (fseek(rrd_file, (sizeof(stat_head_t)
1294                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1295                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1296               SEEK_SET) != 0) {
1297         rrd_set_error("seek rrd for live header writeback");
1298         free(updvals);
1299         free(tmpl_idx);
1300         rrd_free(&rrd);
1301         free(pdp_temp);
1302         free(pdp_new);
1303         fclose(rrd_file);
1304         return(-1);
1305     }
1306
1307     if(version >= 3) {
1308             if(fwrite( rrd.live_head,
1309                        sizeof(live_head_t), 1, rrd_file) != 1){
1310                 rrd_set_error("fwrite live_head to rrd");
1311                 free(updvals);
1312                 rrd_free(&rrd);
1313                 free(tmpl_idx);
1314                 free(pdp_temp);
1315                 free(pdp_new);
1316                 fclose(rrd_file);
1317                 return(-1);
1318             }
1319     }
1320     else {
1321             if(fwrite( &rrd.live_head->last_up,
1322                        sizeof(time_t), 1, rrd_file) != 1){
1323                 rrd_set_error("fwrite live_head to rrd");
1324                 free(updvals);
1325                 rrd_free(&rrd);
1326                 free(tmpl_idx);
1327                 free(pdp_temp);
1328                 free(pdp_new);
1329                 fclose(rrd_file);
1330                 return(-1);
1331             }
1332     }
1333             
1334
1335     if(fwrite( rrd.pdp_prep,
1336                sizeof(pdp_prep_t),
1337                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1338         rrd_set_error("ftwrite pdp_prep to rrd");
1339         free(updvals);
1340         rrd_free(&rrd);
1341         free(tmpl_idx);
1342         free(pdp_temp);
1343         free(pdp_new);
1344         fclose(rrd_file);
1345         return(-1);
1346     }
1347
1348     if(fwrite( rrd.cdp_prep,
1349                sizeof(cdp_prep_t),
1350                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1351        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1352
1353         rrd_set_error("ftwrite cdp_prep to rrd");
1354         free(updvals);
1355         free(tmpl_idx);
1356         rrd_free(&rrd);
1357         free(pdp_temp);
1358         free(pdp_new);
1359         fclose(rrd_file);
1360         return(-1);
1361     }
1362
1363     if(fwrite( rrd.rra_ptr,
1364                sizeof(rra_ptr_t), 
1365                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1366         rrd_set_error("fwrite rra_ptr to rrd");
1367         free(updvals);
1368         free(tmpl_idx);
1369         rrd_free(&rrd);
1370         free(pdp_temp);
1371         free(pdp_new);
1372         fclose(rrd_file);
1373         return(-1);
1374     }
1375
1376     /* OK now close the files and free the memory */
1377     if(fclose(rrd_file) != 0){
1378         rrd_set_error("closing rrd");
1379         free(updvals);
1380         free(tmpl_idx);
1381         rrd_free(&rrd);
1382         free(pdp_temp);
1383         free(pdp_new);
1384         return(-1);
1385     }
1386
1387     /* calling the smoothing code here guarantees at most
1388          * one smoothing operation per rrd_update call. Unfortunately,
1389          * it is possible with bulk updates, or a long-delayed update
1390          * for smoothing to occur off-schedule. This really isn't
1391          * critical except during the burning cycles. */
1392         if (schedule_smooth)
1393         {
1394 #ifndef WIN32
1395           rrd_file = fopen(filename,"r+");
1396 #else
1397           rrd_file = fopen(filename,"rb+");
1398 #endif
1399           rra_start = rra_begin;
1400           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1401           {
1402             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1403                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1404             {
1405 #ifdef DEBUG
1406               fprintf(stderr,"Running smoother for rra %ld\n",i);
1407 #endif
1408               apply_smoother(&rrd,i,rra_start,rrd_file);
1409               if (rrd_test_error())
1410                 break;
1411             }
1412             rra_start += rrd.rra_def[i].row_cnt
1413               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1414           }
1415           fclose(rrd_file);
1416         }
1417     rrd_free(&rrd);
1418     free(updvals);
1419     free(tmpl_idx);
1420     free(pdp_new);
1421     free(pdp_temp);
1422     return(0);
1423 }
1424
1425 /*
1426  * get exclusive lock to whole file.
1427  * lock gets removed when we close the file
1428  *
1429  * returns 0 on success
1430  */
1431 int
1432 LockRRD(FILE *rrdfile)
1433 {
1434     int rrd_fd;         /* File descriptor for RRD */
1435     int                 stat;
1436
1437     rrd_fd = fileno(rrdfile);
1438
1439         {
1440 #ifndef WIN32    
1441                 struct flock    lock;
1442     lock.l_type = F_WRLCK;    /* exclusive write lock */
1443     lock.l_len = 0;           /* whole file */
1444     lock.l_start = 0;         /* start of file */
1445     lock.l_whence = SEEK_SET;   /* end of file */
1446
1447     stat = fcntl(rrd_fd, F_SETLK, &lock);
1448 #else
1449                 struct _stat st;
1450
1451                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1452                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1453                 } else {
1454                         stat = -1;
1455                 }
1456 #endif
1457         }
1458
1459     return(stat);
1460 }
1461
1462
1463 info_t
1464 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1465                unsigned short CDP_scratch_idx, FILE *rrd_file,
1466                    info_t *pcdp_summary, time_t *rra_time)
1467 {
1468    unsigned long ds_idx, cdp_idx;
1469    infoval iv;
1470   
1471    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1472    {
1473       /* compute the cdp index */
1474       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1475 #ifdef DEBUG
1476           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1477              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1478              rrd -> rra_def[rra_idx].cf_nam);
1479 #endif 
1480       if (pcdp_summary != NULL)
1481           {
1482              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1483              /* append info to the return hash */
1484                  pcdp_summary = info_push(pcdp_summary,
1485                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1486                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1487                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1488          RD_I_VAL, iv);
1489           }
1490           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1491                  sizeof(rrd_value_t),1,rrd_file) != 1)
1492           { 
1493              rrd_set_error("writing rrd");
1494              return 0;
1495           }
1496           *rra_current += sizeof(rrd_value_t);
1497         }
1498         return (pcdp_summary);
1499 }