957cf773b83fb1dcadc3a5f25971bba25b231e10
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.1.x  Copyright Tobias Oetiker, 1997 - 2004
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.15  2004/05/25 20:51:49  oetiker
9  * Update displayed copyright messages to be consistent. -- Mike Slifcak
10  *
11  * Revision 1.14  2003/11/11 19:46:21  oetiker
12  * replaced time_value with rrd_time_value as MacOS X introduced a struct of that name in their standard headers
13  *
14  * Revision 1.13  2003/11/11 19:38:03  oetiker
15  * rrd files should NOT change size ever ... bulk update code wa buggy.
16  * -- David M. Grimes <dgrimes@navisite.com>
17  *
18  * Revision 1.12  2003/09/04 13:16:12  oetiker
19  * should not assigne but compare ... grrrrr
20  *
21  * Revision 1.11  2003/09/02 21:58:35  oetiker
22  * be pickier about what we accept in rrd_update. Complain if things do not work out
23  *
24  * Revision 1.10  2003/04/29 19:14:12  jake
25  * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
26  * Also revert accidental addition of -I to aclocal MakeMakefile.
27  *
28  * Revision 1.9  2003/04/25 18:35:08  jake
29  * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
30  *
31  * Revision 1.8  2003/03/31 21:22:12  oetiker
32  * enables RRDtool updates with microsecond or in case of windows millisecond
33  * precision. This is needed to reduce time measurement error when archive step
34  * is small. (<30s) --  Sasha Mikheev <sasha@avalon-net.co.il>
35  *
36  * Revision 1.7  2003/02/13 07:05:27  oetiker
37  * Find attached the patch I promised to send to you. Please note that there
38  * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
39  * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
40  * library is identical to librrd, but it contains support code for per-thread
41  * global variables currently used for error information only. This is similar
42  * to how errno per-thread variables are implemented.  librrd_th must be linked
43  * alongside of libpthred
44  *
45  * There is also a new file "THREADS", holding some documentation.
46  *
47  * -- Peter Stamfest <peter@stamfest.at>
48  *
49  * Revision 1.6  2002/02/01 20:34:49  oetiker
50  * fixed version number and date/time
51  *
52  * Revision 1.5  2001/05/09 05:31:01  oetiker
53  * Bug fix: when update of multiple PDP/CDP RRAs coincided
54  * with interpolation of multiple PDPs an incorrect value was
55  * stored as the CDP. Especially evident for GAUGE data sources.
56  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
57  *
58  * Revision 1.4  2001/03/10 23:54:41  oetiker
59  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
60  * parser and calculator from rrd_graph and puts then in a new file,
61  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
62  * clean-up of aberrant behavior stuff, including a bug fix.
63  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
64  * -- Jake Brutlag <jakeb@corp.webtv.net>
65  *
66  * Revision 1.3  2001/03/04 13:01:55  oetiker
67  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
68  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
69  * This is backwards compatible! But new files using the Aberrant stuff are not readable
70  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
71  * -- Jake Brutlag <jakeb@corp.webtv.net>
72  *
73  * Revision 1.2  2001/03/04 11:14:25  oetiker
74  * added at-style-time@value:value syntax to rrd_update
75  * --  Dave Bodenstab <imdave@mcs.net>
76  *
77  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
78  * checkin
79  *
80  *****************************************************************************/
81
82 #include "rrd_tool.h"
83 #include <sys/types.h>
84 #include <fcntl.h>
85
86 #ifdef WIN32
87  #include <sys/locking.h>
88  #include <sys/stat.h>
89  #include <io.h>
90 #endif
91
92 #include "rrd_hw.h"
93 #include "rrd_rpncalc.h"
94
95 #include "rrd_is_thread_safe.h"
96
97 #ifdef WIN32
98 /*
99  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
100  * replacement.
101  */
102 #include <sys/timeb.h>
103
104 struct timeval {
105         time_t tv_sec; /* seconds */
106         long tv_usec;  /* microseconds */
107 };
108
109 struct __timezone {
110         int  tz_minuteswest; /* minutes W of Greenwich */
111         int  tz_dsttime;     /* type of dst correction */
112 };
113
114 static gettimeofday(struct timeval *t, struct __timezone *tz) {
115         
116         struct timeb current_time;
117
118         _ftime(&current_time);
119         
120         t->tv_sec  = current_time.time;
121         t->tv_usec = current_time.millitm * 1000;
122 }
123
124 #endif
125 /*
126  * normilize time as returned by gettimeofday. usec part must
127  * be always >= 0
128  */
129 static void normalize_time(struct timeval *t)
130 {
131         if(t->tv_usec < 0) {
132                 t->tv_sec--;
133                 t->tv_usec += 1000000L;
134         }
135 }
136
137 /* Local prototypes */
138 int LockRRD(FILE *rrd_file);
139 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
140                                         unsigned long *rra_current,
141                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
142                                         info_t *pcdp_summary, time_t *rra_time);
143 int rrd_update_r(char *filename, char *template, int argc, char **argv);
144 int _rrd_update(char *filename, char *template, int argc, char **argv, 
145                                         info_t*);
146
147 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
148
149
150 #ifdef STANDALONE
151 int 
152 main(int argc, char **argv){
153         rrd_update(argc,argv);
154         if (rrd_test_error()) {
155                 printf("RRDtool 1.1.x  Copyright (C) 1997-2004 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
156                         "Usage: rrdupdate filename\n"
157                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
158                         "\t\t\ttime|N:value[:value...]\n\n"
159                         "\t\t\tat-time@value[:value...]\n\n"
160                         "\t\t\t[ time:value[:value...] ..]\n\n");
161                                    
162                 printf("ERROR: %s\n",rrd_get_error());
163                 rrd_clear_error();                                                            
164                 return 1;
165         }
166         return 0;
167 }
168 #endif
169
170 info_t *rrd_update_v(int argc, char **argv)
171 {
172     char             *template = NULL;          
173         info_t *result = NULL;
174         infoval rc;
175
176     while (1) {
177                 static struct option long_options[] =
178                         {
179                                 {"template",      required_argument, 0, 't'},
180                                 {0,0,0,0}
181                         };
182                 int option_index = 0;
183                 int opt;
184                 opt = getopt_long(argc, argv, "t:", 
185                                                   long_options, &option_index);
186                 
187                 if (opt == EOF)
188                         break;
189                 
190                 switch(opt) {
191                 case 't':
192                         template = optarg;
193                         break;
194                 
195                 case '?':
196                         rrd_set_error("unknown option '%s'",argv[optind-1]);
197             rc.u_int = -1;
198                         goto end_tag;
199                 }
200     }
201
202     /* need at least 2 arguments: filename, data. */
203     if (argc-optind < 2) {
204                 rrd_set_error("Not enough arguments");
205         rc.u_int = -1;
206                 goto end_tag;
207     }
208     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
209         rc.u_int = _rrd_update(argv[optind], template,
210                       argc - optind - 1, argv + optind + 1, result);
211     result->value.u_int = rc.u_int;
212 end_tag:
213     return result;
214 }
215
216 int
217 rrd_update(int argc, char **argv)
218 {
219     char             *template = NULL;          
220     int rc;
221
222     while (1) {
223                 static struct option long_options[] =
224                         {
225                                 {"template",      required_argument, 0, 't'},
226                                 {0,0,0,0}
227                         };
228                 int option_index = 0;
229                 int opt;
230                 opt = getopt_long(argc, argv, "t:", 
231                                                   long_options, &option_index);
232                 
233                 if (opt == EOF)
234                         break;
235                 
236                 switch(opt) {
237                 case 't':
238                         template = optarg;
239                         break;
240                 
241                 case '?':
242                         rrd_set_error("unknown option '%s'",argv[optind-1]);
243                         return(-1);
244                 }
245     }
246
247     /* need at least 2 arguments: filename, data. */
248     if (argc-optind < 2) {
249                 rrd_set_error("Not enough arguments");
250
251                 return -1;
252     }
253  
254         rc = rrd_update_r(argv[optind], template,
255                       argc - optind - 1, argv + optind + 1);
256     return rc;
257 }
258
259 int
260 rrd_update_r(char *filename, char *template, int argc, char **argv)
261 {
262    return _rrd_update(filename, template, argc, argv, NULL);
263 }
264
265 int
266 _rrd_update(char *filename, char *template, int argc, char **argv, 
267    info_t *pcdp_summary)
268 {
269
270     int              arg_i = 2;
271     short            j;
272     unsigned long    i,ii,iii=1;
273
274     unsigned long    rra_begin;          /* byte pointer to the rra
275                                           * area in the rrd file.  this
276                                           * pointer never changes value */
277     unsigned long    rra_start;          /* byte pointer to the rra
278                                           * area in the rrd file.  this
279                                           * pointer changes as each rrd is
280                                           * processed. */
281     unsigned long    rra_current;        /* byte pointer to the current write
282                                           * spot in the rrd file. */
283     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
284     double           interval,
285         pre_int,post_int;                /* interval between this and
286                                           * the last run */
287     unsigned long    proc_pdp_st;        /* which pdp_st was the last
288                                           * to be processed */
289     unsigned long    occu_pdp_st;        /* when was the pdp_st
290                                           * before the last update
291                                           * time */
292     unsigned long    proc_pdp_age;       /* how old was the data in
293                                           * the pdp prep area when it
294                                           * was last updated */
295     unsigned long    occu_pdp_age;       /* how long ago was the last
296                                           * pdp_step time */
297     rrd_value_t      *pdp_new;           /* prepare the incoming data
298                                           * to be added the the
299                                           * existing entry */
300     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
301                                           * to be added the the
302                                           * cdp values */
303
304     long             *tmpl_idx;          /* index representing the settings
305                                             transported by the template index */
306     unsigned long    tmpl_cnt = 2;       /* time and data */
307
308     FILE             *rrd_file;
309     rrd_t            rrd;
310     time_t           current_time;
311         time_t           rra_time; /* time of update for a RRA */
312     unsigned long    current_time_usec;  /* microseconds part of current time */
313     struct timeval   tmp_time;           /* used for time conversion */
314
315     char             **updvals;
316     int              schedule_smooth = 0;
317         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
318                                          /* a vector of future Holt-Winters seasonal coefs */
319     unsigned long    elapsed_pdp_st;
320                                          /* number of elapsed PDP steps since last update */
321     unsigned long    *rra_step_cnt = NULL;
322                                          /* number of rows to be updated in an RRA for a data
323                                           * value. */
324     unsigned long    start_pdp_offset;
325                                          /* number of PDP steps since the last update that
326                                           * are assigned to the first CDP to be generated
327                                           * since the last update. */
328     unsigned short   scratch_idx;
329                                          /* index into the CDP scratch array */
330     enum cf_en       current_cf;
331                                          /* numeric id of the current consolidation function */
332     rpnstack_t       rpnstack; /* used for COMPUTE DS */
333     int              version;  /* rrd version */
334     char             *endptr; /* used in the conversion */
335
336     rpnstack_init(&rpnstack);
337
338     /* need at least 1 arguments: data. */
339     if (argc < 1) {
340         rrd_set_error("Not enough arguments");
341         return -1;
342     }
343     
344     
345
346     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
347         return -1;
348     }
349     /* initialize time */
350     version = atoi(rrd.stat_head->version);
351     gettimeofday(&tmp_time, 0);
352     normalize_time(&tmp_time);
353     current_time = tmp_time.tv_sec;
354     if(version >= 3) {
355         current_time_usec = tmp_time.tv_usec;
356     }
357     else {
358         current_time_usec = 0;
359     }
360
361     rra_current = rra_start = rra_begin = ftell(rrd_file);
362     /* This is defined in the ANSI C standard, section 7.9.5.3:
363
364         When a file is opened with udpate mode ('+' as the second
365         or third character in the ... list of mode argument
366         variables), both input and ouptut may be performed on the
367         associated stream.  However, ...  input may not be directly
368         followed by output without an intervening call to a file
369         positioning function, unless the input oepration encounters
370         end-of-file. */
371     fseek(rrd_file, 0, SEEK_CUR);
372
373     
374     /* get exclusive lock to whole file.
375      * lock gets removed when we close the file.
376      */
377     if (LockRRD(rrd_file) != 0) {
378       rrd_set_error("could not lock RRD");
379       rrd_free(&rrd);
380       fclose(rrd_file);
381       return(-1);   
382     } 
383
384     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
385         rrd_set_error("allocating updvals pointer array");
386         rrd_free(&rrd);
387         fclose(rrd_file);
388         return(-1);
389     }
390
391     if ((pdp_temp = malloc(sizeof(rrd_value_t)
392                            *rrd.stat_head->ds_cnt))==NULL){
393         rrd_set_error("allocating pdp_temp ...");
394         free(updvals);
395         rrd_free(&rrd);
396         fclose(rrd_file);
397         return(-1);
398     }
399
400     if ((tmpl_idx = malloc(sizeof(unsigned long)
401                            *(rrd.stat_head->ds_cnt+1)))==NULL){
402         rrd_set_error("allocating tmpl_idx ...");
403         free(pdp_temp);
404         free(updvals);
405         rrd_free(&rrd);
406         fclose(rrd_file);
407         return(-1);
408     }
409     /* initialize template redirector */
410     /* default config example (assume DS 1 is a CDEF DS)
411        tmpl_idx[0] -> 0; (time)
412        tmpl_idx[1] -> 1; (DS 0)
413        tmpl_idx[2] -> 3; (DS 2)
414        tmpl_idx[3] -> 4; (DS 3) */
415     tmpl_idx[0] = 0; /* time */
416     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
417         {
418            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
419               tmpl_idx[ii++]=i;
420         }
421     tmpl_cnt= ii;
422
423     if (template) {
424         char *dsname;
425         unsigned int tmpl_len;
426         dsname = template;
427         tmpl_cnt = 1; /* the first entry is the time */
428         tmpl_len = strlen(template);
429         for(i=0;i<=tmpl_len ;i++) {
430             if (template[i] == ':' || template[i] == '\0') {
431                 template[i] = '\0';
432                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
433                     rrd_set_error("Template contains more DS definitions than RRD");
434                     free(updvals); free(pdp_temp);
435                     free(tmpl_idx); rrd_free(&rrd);
436                     fclose(rrd_file); return(-1);
437                 }
438                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
439                     rrd_set_error("unknown DS name '%s'",dsname);
440                     free(updvals); free(pdp_temp);
441                     free(tmpl_idx); rrd_free(&rrd);
442                     fclose(rrd_file); return(-1);
443                 } else {
444                   /* the first element is always the time */
445                   tmpl_idx[tmpl_cnt-1]++; 
446                   /* go to the next entry on the template */
447                   dsname = &template[i+1];
448                   /* fix the damage we did before */
449                   if (i<tmpl_len) {
450                      template[i]=':';
451                   } 
452
453                 }
454             }       
455         }
456     }
457     if ((pdp_new = malloc(sizeof(rrd_value_t)
458                           *rrd.stat_head->ds_cnt))==NULL){
459         rrd_set_error("allocating pdp_new ...");
460         free(updvals);
461         free(pdp_temp);
462         free(tmpl_idx);
463         rrd_free(&rrd);
464         fclose(rrd_file);
465         return(-1);
466     }
467
468     /* loop through the arguments. */
469     for(arg_i=0; arg_i<argc;arg_i++) {
470         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
471         char *step_start = stepper;
472         char *p;
473         char *parsetime_error = NULL;
474         enum {atstyle, normal} timesyntax;
475         struct rrd_time_value ds_tv;
476         if (stepper == NULL){
477                 rrd_set_error("failed duplication argv entry");
478                 free(updvals);
479                 free(pdp_temp);  
480                 free(tmpl_idx);
481                 rrd_free(&rrd);
482                 fclose(rrd_file);
483                 return(-1);
484          }
485         /* initialize all ds input to unknown except the first one
486            which has always got to be set */
487         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
488         strcpy(stepper,argv[arg_i]);
489         updvals[0]=stepper;
490         /* separate all ds elements; first must be examined separately
491            due to alternate time syntax */
492         if ((p=strchr(stepper,'@'))!=NULL) {
493             timesyntax = atstyle;
494             *p = '\0';
495             stepper = p+1;
496         } else if ((p=strchr(stepper,':'))!=NULL) {
497             timesyntax = normal;
498             *p = '\0';
499             stepper = p+1;
500         } else {
501             rrd_set_error("expected timestamp not found in data source from %s:...",
502                           argv[arg_i]);
503             free(step_start);
504             break;
505         }
506         ii=1;
507         updvals[tmpl_idx[ii]] = stepper;
508         while (*stepper) {
509             if (*stepper == ':') {
510                 *stepper = '\0';
511                 ii++;
512                 if (ii<tmpl_cnt){                   
513                     updvals[tmpl_idx[ii]] = stepper+1;
514                 }
515             }
516             stepper++;
517         }
518
519         if (ii != tmpl_cnt-1) {
520             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
521                           tmpl_cnt-1, ii, argv[arg_i]);
522             free(step_start);
523             break;
524         }
525         
526         /* get the time from the reading ... handle N */
527         if (timesyntax == atstyle) {
528             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
529                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
530                 free(step_start);
531                 break;
532             }
533             if (ds_tv.type == RELATIVE_TO_END_TIME ||
534                 ds_tv.type == RELATIVE_TO_START_TIME) {
535                 rrd_set_error("specifying time relative to the 'start' "
536                               "or 'end' makes no sense here: %s",
537                               updvals[0]);
538                 free(step_start);
539                 break;
540             }
541
542             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
543             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
544             
545         } else if (strcmp(updvals[0],"N")==0){
546             gettimeofday(&tmp_time, 0);
547             normalize_time(&tmp_time);
548             current_time = tmp_time.tv_sec;
549             current_time_usec = tmp_time.tv_usec;
550         } else {
551             double tmp;
552             tmp = strtod(updvals[0], 0);
553             current_time = floor(tmp);
554             current_time_usec = (long)((tmp - current_time) * 1000000L);
555         }
556         /* dont do any correction for old version RRDs */
557         if(version < 3) 
558             current_time_usec = 0;
559         
560         if(current_time <= rrd.live_head->last_up){
561             rrd_set_error("illegal attempt to update using time %ld when "
562                           "last update time is %ld (minimum one second step)",
563                           current_time, rrd.live_head->last_up);
564             free(step_start);
565             break;
566         }
567         
568         
569         /* seek to the beginning of the rra's */
570         if (rra_current != rra_begin) {
571             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
572                 rrd_set_error("seek error in rrd");
573                 free(step_start);
574                 break;
575             }
576             rra_current = rra_begin;
577         }
578         rra_start = rra_begin;
579
580         /* when was the current pdp started */
581         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
582         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
583
584         /* when did the last pdp_st occur */
585         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
586         occu_pdp_st = current_time - occu_pdp_age;
587         /* interval = current_time - rrd.live_head->last_up; */
588         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
589     
590         if (occu_pdp_st > proc_pdp_st){
591             /* OK we passed the pdp_st moment*/
592             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
593                                                               * occurred before the latest
594                                                               * pdp_st moment*/
595             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
596             post_int = occu_pdp_age;                         /* how much after it */
597             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
598         } else {
599             pre_int = interval;
600             post_int = 0;
601         }
602
603 #ifdef DEBUG
604         printf(
605                "proc_pdp_age %lu\t"
606                "proc_pdp_st %lu\t" 
607                "occu_pfp_age %lu\t" 
608                "occu_pdp_st %lu\t"
609                "int %lf\t"
610                "pre_int %lf\t"
611                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
612                 occu_pdp_age, occu_pdp_st,
613                interval, pre_int, post_int);
614 #endif
615     
616         /* process the data sources and update the pdp_prep 
617          * area accordingly */
618         for(i=0;i<rrd.stat_head->ds_cnt;i++){
619             enum dst_en dst_idx;
620             dst_idx= dst_conv(rrd.ds_def[i].dst);
621                 /* NOTE: DST_CDEF should never enter this if block, because
622                  * updvals[i+1][0] is initialized to 'U'; unless the caller
623                  * accidently specified a value for the DST_CDEF. To handle 
624                  * this case, an extra check is required. */
625             if((updvals[i+1][0] != 'U') &&
626                    (dst_idx != DST_CDEF) &&
627                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
628                double rate = DNAN;
629                /* the data source type defines how to process the data */
630                 /* pdp_new contains rate * time ... eg the bytes
631                  * transferred during the interval. Doing it this way saves
632                  * a lot of math operations */
633                 
634
635                 switch(dst_idx){
636                 case DST_COUNTER:
637                 case DST_DERIVE:
638                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
639                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
640                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
641                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
642                                  break;
643                             }
644                        }
645                        if (rrd_test_error()){
646                             break;
647                        }
648                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
649                        if(dst_idx == DST_COUNTER) {
650                           /* simple overflow catcher sugestet by andres kroonmaa */
651                           /* this will fail terribly for non 32 or 64 bit counters ... */
652                           /* are there any others in SNMP land ? */
653                           if (pdp_new[i] < (double)0.0 ) 
654                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
655                           if (pdp_new[i] < (double)0.0 ) 
656                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
657                        }
658                        rate = pdp_new[i] / interval;
659                     }
660                    else {
661                      pdp_new[i]= DNAN;          
662                    }
663                    break;
664                 case DST_ABSOLUTE:
665                     errno = 0;
666                     pdp_new[i] = strtod(updvals[i+1],&endptr);
667                     if (errno > 0){
668                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
669                         break;
670                     };
671                     if (endptr[0] != '\0'){
672                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
673                         break;
674                     }
675                     rate = pdp_new[i] / interval;                 
676                     break;
677                 case DST_GAUGE:
678                     errno = 0;
679                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
680                     if (errno > 0){
681                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
682                         break;
683                     };
684                     if (endptr[0] != '\0'){
685                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
686                         break;
687                     }
688                     rate = pdp_new[i] / interval;                  
689                     break;
690                 default:
691                     rrd_set_error("rrd contains unknown DS type : '%s'",
692                                   rrd.ds_def[i].dst);
693                     break;
694                 }
695                 /* break out of this for loop if the error string is set */
696                 if (rrd_test_error()){
697                     break;
698                 }
699                /* make sure pdp_temp is neither too large or too small
700                 * if any of these occur it becomes unknown ...
701                 * sorry folks ... */
702                if ( ! isnan(rate) && 
703                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
704                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
705                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
706                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
707                   pdp_new[i] = DNAN;
708                }               
709             } else {
710                 /* no news is news all the same */
711                 pdp_new[i] = DNAN;
712             }
713             
714             /* make a copy of the command line argument for the next run */
715 #ifdef DEBUG
716             fprintf(stderr,
717                     "prep ds[%lu]\t"
718                     "last_arg '%s'\t"
719                     "this_arg '%s'\t"
720                     "pdp_new %10.2f\n",
721                     i,
722                     rrd.pdp_prep[i].last_ds,
723                     updvals[i+1], pdp_new[i]);
724 #endif
725             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
726                 strncpy(rrd.pdp_prep[i].last_ds,
727                         updvals[i+1],LAST_DS_LEN-1);
728                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
729             }
730         }
731         /* break out of the argument parsing loop if the error_string is set */
732         if (rrd_test_error()){
733             free(step_start);
734             break;
735         }
736         /* has a pdp_st moment occurred since the last run ? */
737
738         if (proc_pdp_st == occu_pdp_st){
739             /* no we have not passed a pdp_st moment. therefore update is simple */
740
741             for(i=0;i<rrd.stat_head->ds_cnt;i++){
742                 if(isnan(pdp_new[i]))
743                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
744                 else
745                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
746 #ifdef DEBUG
747                 fprintf(stderr,
748                         "NO PDP  ds[%lu]\t"
749                         "value %10.2f\t"
750                         "unkn_sec %5lu\n",
751                         i,
752                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
753                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
754 #endif
755             }   
756         } else {
757             /* an pdp_st has occurred. */
758
759             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
760              * occurred up to the last run.        
761             pdp_new[] contains rate*seconds from the latest run.
762             pdp_temp[] will contain the rate for cdp */
763
764             for(i=0;i<rrd.stat_head->ds_cnt;i++){
765                 /* update pdp_prep to the current pdp_st */
766                 if(isnan(pdp_new[i]))
767                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
768                 else
769                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
770                         pdp_new[i]/(double)interval*(double)pre_int;
771
772                 /* if too much of the pdp_prep is unknown we dump it */
773                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
774                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
775                     (occu_pdp_st-proc_pdp_st <= 
776                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
777                     pdp_temp[i] = DNAN;
778                 } else {
779                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
780                         / (double)( occu_pdp_st
781                                    - proc_pdp_st
782                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
783                 }
784
785                 /* process CDEF data sources; remember each CDEF DS can
786                  * only reference other DS with a lower index number */
787             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
788                    rpnp_t *rpnp;
789                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
790                    /* substitue data values for OP_VARIABLE nodes */
791                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
792                    {
793                           if (rpnp[ii].op == OP_VARIABLE) {
794                                  rpnp[ii].op = OP_NUMBER;
795                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
796                           }
797                    }
798                    /* run the rpn calculator */
799                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
800                           free(rpnp);
801                           break; /* exits the data sources pdp_temp loop */
802                    }
803                 }
804         
805                 /* make pdp_prep ready for the next run */
806                 if(isnan(pdp_new[i])){
807                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
808                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
809                 } else {
810                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
811                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
812                         pdp_new[i]/(double)interval*(double)post_int;
813                 }
814
815 #ifdef DEBUG
816                 fprintf(stderr,
817                         "PDP UPD ds[%lu]\t"
818                         "pdp_temp %10.2f\t"
819                         "new_prep %10.2f\t"
820                         "new_unkn_sec %5lu\n",
821                         i, pdp_temp[i],
822                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
823                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
824 #endif
825             }
826
827                 /* if there were errors during the last loop, bail out here */
828             if (rrd_test_error()){
829                free(step_start);
830                break;
831             }
832
833                 /* compute the number of elapsed pdp_st moments */
834                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
835 #ifdef DEBUG
836                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
837 #endif
838                 if (rra_step_cnt == NULL)
839                 {
840                    rra_step_cnt = (unsigned long *) 
841                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
842                 }
843
844             for(i = 0, rra_start = rra_begin;
845                 i < rrd.stat_head->rra_cnt;
846             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
847                 i++)
848                 {
849                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
850                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
851                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
852         if (start_pdp_offset <= elapsed_pdp_st) {
853            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
854                       rrd.rra_def[i].pdp_cnt + 1;
855             } else {
856                    rra_step_cnt[i] = 0;
857                 }
858
859                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
860                 {
861                    /* If this is a bulk update, we need to skip ahead in the seasonal
862                         * arrays so that they will be correct for the next observed value;
863                         * note that for the bulk update itself, no update will occur to
864                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
865                         * be set to DNAN. */
866            if (rra_step_cnt[i] > 2) 
867                    {
868                           /* skip update by resetting rra_step_cnt[i],
869                            * note that this is not data source specific; this is due
870                            * to the bulk update, not a DNAN value for the specific data
871                            * source. */
872                           rra_step_cnt[i] = 0;
873               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
874                              &last_seasonal_coef);
875                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
876                              &seasonal_coef);
877                    }
878                 
879                   /* periodically run a smoother for seasonal effects */
880                   /* Need to use first cdp parameter buffer to track
881                    * burnin (burnin requires a specific smoothing schedule).
882                    * The CDP_init_seasonal parameter is really an RRA level,
883                    * not a data source within RRA level parameter, but the rra_def
884                    * is read only for rrd_update (not flushed to disk). */
885                   iii = i*(rrd.stat_head -> ds_cnt);
886                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
887                           <= BURNIN_CYCLES)
888                   {
889                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
890                                  > rrd.rra_def[i].row_cnt - 1) {
891                            /* mark off one of the burnin cycles */
892                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
893                        schedule_smooth = 1;
894                          }  
895                   } else {
896                          /* someone has no doubt invented a trick to deal with this
897                           * wrap around, but at least this code is clear. */
898                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
899                              rrd.rra_ptr[i].cur_row)
900                          {
901                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
902                                   * mapping between PDP and CDP */
903                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
904                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
905                                  {
906 #ifdef DEBUG
907                                         fprintf(stderr,
908                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
909                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
910                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
911 #endif
912                                         schedule_smooth = 1;
913                                  }
914              } else {
915                                  /* can't rely on negative numbers because we are working with
916                                   * unsigned values */
917                                  /* Don't need modulus here. If we've wrapped more than once, only
918                                   * one smooth is executed at the end. */
919                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
920                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
921                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
922                                  {
923 #ifdef DEBUG
924                                         fprintf(stderr,
925                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
926                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
927                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
928 #endif
929                                         schedule_smooth = 1;
930                                  }
931                          }
932                   }
933
934               rra_current = ftell(rrd_file); 
935                 } /* if cf is DEVSEASONAL or SEASONAL */
936
937         if (rrd_test_error()) break;
938
939                     /* update CDP_PREP areas */
940                     /* loop over data soures within each RRA */
941                     for(ii = 0;
942                         ii < rrd.stat_head->ds_cnt;
943                         ii++)
944                         {
945                         
946                         /* iii indexes the CDP prep area for this data source within the RRA */
947                         iii=i*rrd.stat_head->ds_cnt+ii;
948
949                         if (rrd.rra_def[i].pdp_cnt > 1) {
950                           
951                            if (rra_step_cnt[i] > 0) {
952                            /* If we are in this block, as least 1 CDP value will be written to
953                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
954                                 * to be written, then the "fill in" value is the CDP_secondary_val
955                                 * entry. */
956                                   if (isnan(pdp_temp[ii]))
957                   {
958                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
959                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
960                                   } else {
961                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
962                                           * CDP data entries. No matter the CF, the value is the same because
963                                           * the average, max, min, and last of a list of identical values is
964                                           * the same, namely, the value itself. */
965                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
966                                   }
967                      
968                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
969                                       > rrd.rra_def[i].pdp_cnt*
970                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
971                                   {
972                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
973                                          /* initialize carry over */
974                                          if (current_cf == CF_AVERAGE) {
975                                                    if (isnan(pdp_temp[ii])) { 
976                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
977                                                    } else {
978                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
979                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
980                                                    }
981                                          } else {
982                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
983                                          }
984                                   } else {
985                                          rrd_value_t cum_val, cur_val; 
986                                      switch (current_cf) {
987                                                 case CF_AVERAGE:
988                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
989                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
990                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
991                                                (cum_val + cur_val * start_pdp_offset) /
992                                            (rrd.rra_def[i].pdp_cnt
993                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
994                                                    /* initialize carry over value */
995                                                    if (isnan(pdp_temp[ii])) { 
996                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
997                                                    } else {
998                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
999                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1000                                                    }
1001                                                    break;
1002                                                 case CF_MAXIMUM:
1003                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1004                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1005 #ifdef DEBUG
1006                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1007                                                           isnan(pdp_temp[ii])) {
1008                                                      fprintf(stderr,
1009                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1010                                                                 i,ii);
1011                                                          exit(-1);
1012                                                   }
1013 #endif
1014                                                   if (cur_val > cum_val)
1015                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1016                                                   else
1017                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1018                                                   /* initialize carry over value */
1019                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1020                                                   break;
1021                                                 case CF_MINIMUM:
1022                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1023                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1024 #ifdef DEBUG
1025                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1026                                                           isnan(pdp_temp[ii])) {
1027                                                      fprintf(stderr,
1028                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1029                                                                 i,ii);
1030                                                          exit(-1);
1031                                                   }
1032 #endif
1033                                                   if (cur_val < cum_val)
1034                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1035                                                   else
1036                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1037                                                   /* initialize carry over value */
1038                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1039                                                   break;
1040                                                 case CF_LAST:
1041                                                 default:
1042                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1043                                                    /* initialize carry over value */
1044                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1045                                                 break;
1046                                          }
1047                                   } /* endif meets xff value requirement for a valid value */
1048                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1049                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1050                                   if (isnan(pdp_temp[ii]))
1051                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1052                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1053                                   else
1054                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1055                } else  /* rra_step_cnt[i]  == 0 */
1056                            {
1057 #ifdef DEBUG
1058                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1059                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1060                                          i,ii);
1061                                   } else {
1062                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1063                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1064                                   }
1065 #endif
1066                                   if (isnan(pdp_temp[ii])) {
1067                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1068                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1069                                   {
1070                                          if (current_cf == CF_AVERAGE) {
1071                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1072                                                    elapsed_pdp_st;
1073                                          } else {
1074                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1075                                          }
1076 #ifdef DEBUG
1077                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1078                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1079 #endif
1080                                   } else {
1081                                          switch (current_cf) {
1082                                          case CF_AVERAGE:
1083                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1084                                                    elapsed_pdp_st;
1085                                                 break;
1086                                          case CF_MINIMUM:
1087                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1088                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1089                                                 break; 
1090                                          case CF_MAXIMUM:
1091                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1092                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1093                                                 break; 
1094                                          case CF_LAST:
1095                                          default:
1096                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1097                                                 break;
1098                                          }
1099                                   }
1100                            }
1101                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1102                            if (elapsed_pdp_st > 2)
1103                            {
1104                                    switch (current_cf) {
1105                                    case CF_AVERAGE:
1106                                    default:
1107                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1108                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1109                                           break;
1110                    case CF_SEASONAL:
1111                                    case CF_DEVSEASONAL:
1112                                           /* need to update cached seasonal values, so they are consistent
1113                                            * with the bulk update */
1114                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1115                                            * CDP_last_deviation are the same. */
1116                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1117                                                  last_seasonal_coef[ii];
1118                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1119                                                  seasonal_coef[ii];
1120                                           break;
1121                    case CF_HWPREDICT:
1122                                           /* need to update the null_count and last_null_count.
1123                                            * even do this for non-DNAN pdp_temp because the
1124                                            * algorithm is not learning from batch updates. */
1125                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1126                                                  elapsed_pdp_st;
1127                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1128                                                  elapsed_pdp_st - 1;
1129                                           /* fall through */
1130                                    case CF_DEVPREDICT:
1131                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1132                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1133                                           break;
1134                    case CF_FAILURES:
1135                                           /* do not count missed bulk values as failures */
1136                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1137                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1138                                           /* need to reset violations buffer.
1139                                            * could do this more carefully, but for now, just
1140                                            * assume a bulk update wipes away all violations. */
1141                       erase_violations(&rrd, iii, i);
1142                                           break;
1143                                    }
1144                            } 
1145                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1146
1147                         if (rrd_test_error()) break;
1148
1149                         } /* endif data sources loop */
1150         } /* end RRA Loop */
1151
1152                 /* this loop is only entered if elapsed_pdp_st < 3 */
1153                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1154                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1155                 {
1156                for(i = 0, rra_start = rra_begin;
1157                    i < rrd.stat_head->rra_cnt;
1158                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1159                    i++)
1160                    {
1161                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1162
1163                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1164                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1165                           {
1166                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1167                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1168                                 &seasonal_coef);
1169                  rra_current = ftell(rrd_file);
1170                           }
1171                           if (rrd_test_error()) break;
1172                       /* loop over data soures within each RRA */
1173                       for(ii = 0;
1174                           ii < rrd.stat_head->ds_cnt;
1175                           ii++)
1176                           {
1177                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1178                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1179                                     scratch_idx, seasonal_coef);
1180                           }
1181            } /* end RRA Loop */
1182                    if (rrd_test_error()) break;
1183             } /* end elapsed_pdp_st loop */
1184
1185                 if (rrd_test_error()) break;
1186
1187                 /* Ready to write to disk */
1188                 /* Move sequentially through the file, writing one RRA at a time.
1189                  * Note this architecture divorces the computation of CDP with
1190                  * flushing updated RRA entries to disk. */
1191             for(i = 0, rra_start = rra_begin;
1192                 i < rrd.stat_head->rra_cnt;
1193             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1194                 i++) {
1195                 /* is there anything to write for this RRA? If not, continue. */
1196         if (rra_step_cnt[i] == 0) continue;
1197
1198                 /* write the first row */
1199 #ifdef DEBUG
1200         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1201 #endif
1202             rrd.rra_ptr[i].cur_row++;
1203             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1204                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1205                 /* positition on the first row */
1206                 rra_pos_tmp = rra_start +
1207                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1208                 if(rra_pos_tmp != rra_current) {
1209                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1210                       rrd_set_error("seek error in rrd");
1211                       break;
1212                    }
1213                    rra_current = rra_pos_tmp;
1214                 }
1215
1216 #ifdef DEBUG
1217             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1218 #endif
1219                 scratch_idx = CDP_primary_val;
1220                 if (pcdp_summary != NULL)
1221                 {
1222                    rra_time = (current_time - current_time 
1223                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1224                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1225                 }
1226                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1227                    pcdp_summary, &rra_time);
1228                 if (rrd_test_error()) break;
1229
1230                 /* write other rows of the bulk update, if any */
1231                 scratch_idx = CDP_secondary_val;
1232                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1233                 {
1234                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1235                    {
1236 #ifdef DEBUG
1237               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1238                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1239 #endif
1240                           /* wrap */
1241                           rrd.rra_ptr[i].cur_row = 0;
1242                           /* seek back to beginning of current rra */
1243                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1244                           {
1245                          rrd_set_error("seek error in rrd");
1246                          break;
1247                           }
1248 #ifdef DEBUG
1249                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1250 #endif
1251                           rra_current = rra_start;
1252                    }
1253                    if (pcdp_summary != NULL)
1254                    {
1255                       rra_time = (current_time - current_time 
1256                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1257                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1258                    }
1259                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1260                       pcdp_summary, &rra_time);
1261                 }
1262                 
1263                 if (rrd_test_error())
1264                   break;
1265                 } /* RRA LOOP */
1266
1267             /* break out of the argument parsing loop if error_string is set */
1268             if (rrd_test_error()){
1269                    free(step_start);
1270                    break;
1271             } 
1272             
1273         } /* endif a pdp_st has occurred */ 
1274         rrd.live_head->last_up = current_time;
1275         rrd.live_head->last_up_usec = current_time_usec; 
1276         free(step_start);
1277     } /* function argument loop */
1278
1279     if (seasonal_coef != NULL) free(seasonal_coef);
1280     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1281         if (rra_step_cnt != NULL) free(rra_step_cnt);
1282     rpnstack_free(&rpnstack);
1283
1284     /* if we got here and if there is an error and if the file has not been
1285      * written to, then close things up and return. */
1286     if (rrd_test_error()) {
1287         free(updvals);
1288         free(tmpl_idx);
1289         rrd_free(&rrd);
1290         free(pdp_temp);
1291         free(pdp_new);
1292         fclose(rrd_file);
1293         return(-1);
1294     }
1295
1296     /* aargh ... that was tough ... so many loops ... anyway, its done.
1297      * we just need to write back the live header portion now*/
1298
1299     if (fseek(rrd_file, (sizeof(stat_head_t)
1300                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1301                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1302               SEEK_SET) != 0) {
1303         rrd_set_error("seek rrd for live header writeback");
1304         free(updvals);
1305         free(tmpl_idx);
1306         rrd_free(&rrd);
1307         free(pdp_temp);
1308         free(pdp_new);
1309         fclose(rrd_file);
1310         return(-1);
1311     }
1312
1313     if(version >= 3) {
1314             if(fwrite( rrd.live_head,
1315                        sizeof(live_head_t), 1, rrd_file) != 1){
1316                 rrd_set_error("fwrite live_head to rrd");
1317                 free(updvals);
1318                 rrd_free(&rrd);
1319                 free(tmpl_idx);
1320                 free(pdp_temp);
1321                 free(pdp_new);
1322                 fclose(rrd_file);
1323                 return(-1);
1324             }
1325     }
1326     else {
1327             if(fwrite( &rrd.live_head->last_up,
1328                        sizeof(time_t), 1, rrd_file) != 1){
1329                 rrd_set_error("fwrite live_head to rrd");
1330                 free(updvals);
1331                 rrd_free(&rrd);
1332                 free(tmpl_idx);
1333                 free(pdp_temp);
1334                 free(pdp_new);
1335                 fclose(rrd_file);
1336                 return(-1);
1337             }
1338     }
1339             
1340
1341     if(fwrite( rrd.pdp_prep,
1342                sizeof(pdp_prep_t),
1343                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1344         rrd_set_error("ftwrite pdp_prep to rrd");
1345         free(updvals);
1346         rrd_free(&rrd);
1347         free(tmpl_idx);
1348         free(pdp_temp);
1349         free(pdp_new);
1350         fclose(rrd_file);
1351         return(-1);
1352     }
1353
1354     if(fwrite( rrd.cdp_prep,
1355                sizeof(cdp_prep_t),
1356                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1357        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1358
1359         rrd_set_error("ftwrite cdp_prep to rrd");
1360         free(updvals);
1361         free(tmpl_idx);
1362         rrd_free(&rrd);
1363         free(pdp_temp);
1364         free(pdp_new);
1365         fclose(rrd_file);
1366         return(-1);
1367     }
1368
1369     if(fwrite( rrd.rra_ptr,
1370                sizeof(rra_ptr_t), 
1371                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1372         rrd_set_error("fwrite rra_ptr to rrd");
1373         free(updvals);
1374         free(tmpl_idx);
1375         rrd_free(&rrd);
1376         free(pdp_temp);
1377         free(pdp_new);
1378         fclose(rrd_file);
1379         return(-1);
1380     }
1381
1382     /* OK now close the files and free the memory */
1383     if(fclose(rrd_file) != 0){
1384         rrd_set_error("closing rrd");
1385         free(updvals);
1386         free(tmpl_idx);
1387         rrd_free(&rrd);
1388         free(pdp_temp);
1389         free(pdp_new);
1390         return(-1);
1391     }
1392
1393     /* calling the smoothing code here guarantees at most
1394          * one smoothing operation per rrd_update call. Unfortunately,
1395          * it is possible with bulk updates, or a long-delayed update
1396          * for smoothing to occur off-schedule. This really isn't
1397          * critical except during the burning cycles. */
1398         if (schedule_smooth)
1399         {
1400 #ifndef WIN32
1401           rrd_file = fopen(filename,"r+");
1402 #else
1403           rrd_file = fopen(filename,"rb+");
1404 #endif
1405           rra_start = rra_begin;
1406           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1407           {
1408             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1409                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1410             {
1411 #ifdef DEBUG
1412               fprintf(stderr,"Running smoother for rra %ld\n",i);
1413 #endif
1414               apply_smoother(&rrd,i,rra_start,rrd_file);
1415               if (rrd_test_error())
1416                 break;
1417             }
1418             rra_start += rrd.rra_def[i].row_cnt
1419               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1420           }
1421           fclose(rrd_file);
1422         }
1423     rrd_free(&rrd);
1424     free(updvals);
1425     free(tmpl_idx);
1426     free(pdp_new);
1427     free(pdp_temp);
1428     return(0);
1429 }
1430
1431 /*
1432  * get exclusive lock to whole file.
1433  * lock gets removed when we close the file
1434  *
1435  * returns 0 on success
1436  */
1437 int
1438 LockRRD(FILE *rrdfile)
1439 {
1440     int rrd_fd;         /* File descriptor for RRD */
1441     int                 stat;
1442
1443     rrd_fd = fileno(rrdfile);
1444
1445         {
1446 #ifndef WIN32    
1447                 struct flock    lock;
1448     lock.l_type = F_WRLCK;    /* exclusive write lock */
1449     lock.l_len = 0;           /* whole file */
1450     lock.l_start = 0;         /* start of file */
1451     lock.l_whence = SEEK_SET;   /* end of file */
1452
1453     stat = fcntl(rrd_fd, F_SETLK, &lock);
1454 #else
1455                 struct _stat st;
1456
1457                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1458                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1459                 } else {
1460                         stat = -1;
1461                 }
1462 #endif
1463         }
1464
1465     return(stat);
1466 }
1467
1468
1469 info_t
1470 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1471                unsigned short CDP_scratch_idx, FILE *rrd_file,
1472                    info_t *pcdp_summary, time_t *rra_time)
1473 {
1474    unsigned long ds_idx, cdp_idx;
1475    infoval iv;
1476   
1477    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1478    {
1479       /* compute the cdp index */
1480       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1481 #ifdef DEBUG
1482           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1483              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1484              rrd -> rra_def[rra_idx].cf_nam);
1485 #endif 
1486       if (pcdp_summary != NULL)
1487           {
1488              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1489              /* append info to the return hash */
1490                  pcdp_summary = info_push(pcdp_summary,
1491                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1492                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1493                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1494          RD_I_VAL, iv);
1495           }
1496           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1497                  sizeof(rrd_value_t),1,rrd_file) != 1)
1498           { 
1499              rrd_set_error("writing rrd");
1500              return 0;
1501           }
1502           *rra_current += sizeof(rrd_value_t);
1503         }
1504         return (pcdp_summary);
1505 }