fix spelling and syntax, especially in messages that are printed -- Mike Slifcak
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.1.x  Copyright Tobias Oetiker, 1997 - 2004
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.16  2004/05/25 20:52:16  oetiker
9  * fix spelling and syntax, especially in messages that are printed -- Mike Slifcak
10  *
11  * Revision 1.15  2004/05/25 20:51:49  oetiker
12  * Update displayed copyright messages to be consistent. -- Mike Slifcak
13  *
14  * Revision 1.14  2003/11/11 19:46:21  oetiker
15  * replaced time_value with rrd_time_value as MacOS X introduced a struct of that name in their standard headers
16  *
17  * Revision 1.13  2003/11/11 19:38:03  oetiker
18  * rrd files should NOT change size ever ... bulk update code wa buggy.
19  * -- David M. Grimes <dgrimes@navisite.com>
20  *
21  * Revision 1.12  2003/09/04 13:16:12  oetiker
22  * should not assigne but compare ... grrrrr
23  *
24  * Revision 1.11  2003/09/02 21:58:35  oetiker
25  * be pickier about what we accept in rrd_update. Complain if things do not work out
26  *
27  * Revision 1.10  2003/04/29 19:14:12  jake
28  * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
29  * Also revert accidental addition of -I to aclocal MakeMakefile.
30  *
31  * Revision 1.9  2003/04/25 18:35:08  jake
32  * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
33  *
34  * Revision 1.8  2003/03/31 21:22:12  oetiker
35  * enables RRDtool updates with microsecond or in case of windows millisecond
36  * precision. This is needed to reduce time measurement error when archive step
37  * is small. (<30s) --  Sasha Mikheev <sasha@avalon-net.co.il>
38  *
39  * Revision 1.7  2003/02/13 07:05:27  oetiker
40  * Find attached the patch I promised to send to you. Please note that there
41  * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
42  * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
43  * library is identical to librrd, but it contains support code for per-thread
44  * global variables currently used for error information only. This is similar
45  * to how errno per-thread variables are implemented.  librrd_th must be linked
46  * alongside of libpthred
47  *
48  * There is also a new file "THREADS", holding some documentation.
49  *
50  * -- Peter Stamfest <peter@stamfest.at>
51  *
52  * Revision 1.6  2002/02/01 20:34:49  oetiker
53  * fixed version number and date/time
54  *
55  * Revision 1.5  2001/05/09 05:31:01  oetiker
56  * Bug fix: when update of multiple PDP/CDP RRAs coincided
57  * with interpolation of multiple PDPs an incorrect value was
58  * stored as the CDP. Especially evident for GAUGE data sources.
59  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
60  *
61  * Revision 1.4  2001/03/10 23:54:41  oetiker
62  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
63  * parser and calculator from rrd_graph and puts then in a new file,
64  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
65  * clean-up of aberrant behavior stuff, including a bug fix.
66  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
67  * -- Jake Brutlag <jakeb@corp.webtv.net>
68  *
69  * Revision 1.3  2001/03/04 13:01:55  oetiker
70  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
71  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
72  * This is backwards compatible! But new files using the Aberrant stuff are not readable
73  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
74  * -- Jake Brutlag <jakeb@corp.webtv.net>
75  *
76  * Revision 1.2  2001/03/04 11:14:25  oetiker
77  * added at-style-time@value:value syntax to rrd_update
78  * --  Dave Bodenstab <imdave@mcs.net>
79  *
80  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
81  * checkin
82  *
83  *****************************************************************************/
84
85 #include "rrd_tool.h"
86 #include <sys/types.h>
87 #include <fcntl.h>
88
89 #ifdef WIN32
90  #include <sys/locking.h>
91  #include <sys/stat.h>
92  #include <io.h>
93 #endif
94
95 #include "rrd_hw.h"
96 #include "rrd_rpncalc.h"
97
98 #include "rrd_is_thread_safe.h"
99
100 #ifdef WIN32
101 /*
102  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
103  * replacement.
104  */
105 #include <sys/timeb.h>
106
107 struct timeval {
108         time_t tv_sec; /* seconds */
109         long tv_usec;  /* microseconds */
110 };
111
112 struct __timezone {
113         int  tz_minuteswest; /* minutes W of Greenwich */
114         int  tz_dsttime;     /* type of dst correction */
115 };
116
117 static gettimeofday(struct timeval *t, struct __timezone *tz) {
118         
119         struct timeb current_time;
120
121         _ftime(&current_time);
122         
123         t->tv_sec  = current_time.time;
124         t->tv_usec = current_time.millitm * 1000;
125 }
126
127 #endif
128 /*
129  * normilize time as returned by gettimeofday. usec part must
130  * be always >= 0
131  */
132 static void normalize_time(struct timeval *t)
133 {
134         if(t->tv_usec < 0) {
135                 t->tv_sec--;
136                 t->tv_usec += 1000000L;
137         }
138 }
139
140 /* Local prototypes */
141 int LockRRD(FILE *rrd_file);
142 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
143                                         unsigned long *rra_current,
144                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
145                                         info_t *pcdp_summary, time_t *rra_time);
146 int rrd_update_r(char *filename, char *template, int argc, char **argv);
147 int _rrd_update(char *filename, char *template, int argc, char **argv, 
148                                         info_t*);
149
150 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
151
152
153 #ifdef STANDALONE
154 int 
155 main(int argc, char **argv){
156         rrd_update(argc,argv);
157         if (rrd_test_error()) {
158                 printf("RRDtool 1.1.x  Copyright (C) 1997-2004 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
159                         "Usage: rrdupdate filename\n"
160                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
161                         "\t\t\ttime|N:value[:value...]\n\n"
162                         "\t\t\tat-time@value[:value...]\n\n"
163                         "\t\t\t[ time:value[:value...] ..]\n\n");
164                                    
165                 printf("ERROR: %s\n",rrd_get_error());
166                 rrd_clear_error();                                                            
167                 return 1;
168         }
169         return 0;
170 }
171 #endif
172
173 info_t *rrd_update_v(int argc, char **argv)
174 {
175     char             *template = NULL;          
176         info_t *result = NULL;
177         infoval rc;
178
179     while (1) {
180                 static struct option long_options[] =
181                         {
182                                 {"template",      required_argument, 0, 't'},
183                                 {0,0,0,0}
184                         };
185                 int option_index = 0;
186                 int opt;
187                 opt = getopt_long(argc, argv, "t:", 
188                                                   long_options, &option_index);
189                 
190                 if (opt == EOF)
191                         break;
192                 
193                 switch(opt) {
194                 case 't':
195                         template = optarg;
196                         break;
197                 
198                 case '?':
199                         rrd_set_error("unknown option '%s'",argv[optind-1]);
200             rc.u_int = -1;
201                         goto end_tag;
202                 }
203     }
204
205     /* need at least 2 arguments: filename, data. */
206     if (argc-optind < 2) {
207                 rrd_set_error("Not enough arguments");
208         rc.u_int = -1;
209                 goto end_tag;
210     }
211     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
212         rc.u_int = _rrd_update(argv[optind], template,
213                       argc - optind - 1, argv + optind + 1, result);
214     result->value.u_int = rc.u_int;
215 end_tag:
216     return result;
217 }
218
219 int
220 rrd_update(int argc, char **argv)
221 {
222     char             *template = NULL;          
223     int rc;
224
225     while (1) {
226                 static struct option long_options[] =
227                         {
228                                 {"template",      required_argument, 0, 't'},
229                                 {0,0,0,0}
230                         };
231                 int option_index = 0;
232                 int opt;
233                 opt = getopt_long(argc, argv, "t:", 
234                                                   long_options, &option_index);
235                 
236                 if (opt == EOF)
237                         break;
238                 
239                 switch(opt) {
240                 case 't':
241                         template = optarg;
242                         break;
243                 
244                 case '?':
245                         rrd_set_error("unknown option '%s'",argv[optind-1]);
246                         return(-1);
247                 }
248     }
249
250     /* need at least 2 arguments: filename, data. */
251     if (argc-optind < 2) {
252                 rrd_set_error("Not enough arguments");
253
254                 return -1;
255     }
256  
257         rc = rrd_update_r(argv[optind], template,
258                       argc - optind - 1, argv + optind + 1);
259     return rc;
260 }
261
262 int
263 rrd_update_r(char *filename, char *template, int argc, char **argv)
264 {
265    return _rrd_update(filename, template, argc, argv, NULL);
266 }
267
268 int
269 _rrd_update(char *filename, char *template, int argc, char **argv, 
270    info_t *pcdp_summary)
271 {
272
273     int              arg_i = 2;
274     short            j;
275     unsigned long    i,ii,iii=1;
276
277     unsigned long    rra_begin;          /* byte pointer to the rra
278                                           * area in the rrd file.  this
279                                           * pointer never changes value */
280     unsigned long    rra_start;          /* byte pointer to the rra
281                                           * area in the rrd file.  this
282                                           * pointer changes as each rrd is
283                                           * processed. */
284     unsigned long    rra_current;        /* byte pointer to the current write
285                                           * spot in the rrd file. */
286     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
287     double           interval,
288         pre_int,post_int;                /* interval between this and
289                                           * the last run */
290     unsigned long    proc_pdp_st;        /* which pdp_st was the last
291                                           * to be processed */
292     unsigned long    occu_pdp_st;        /* when was the pdp_st
293                                           * before the last update
294                                           * time */
295     unsigned long    proc_pdp_age;       /* how old was the data in
296                                           * the pdp prep area when it
297                                           * was last updated */
298     unsigned long    occu_pdp_age;       /* how long ago was the last
299                                           * pdp_step time */
300     rrd_value_t      *pdp_new;           /* prepare the incoming data
301                                           * to be added the the
302                                           * existing entry */
303     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
304                                           * to be added the the
305                                           * cdp values */
306
307     long             *tmpl_idx;          /* index representing the settings
308                                             transported by the template index */
309     unsigned long    tmpl_cnt = 2;       /* time and data */
310
311     FILE             *rrd_file;
312     rrd_t            rrd;
313     time_t           current_time;
314         time_t           rra_time; /* time of update for a RRA */
315     unsigned long    current_time_usec;  /* microseconds part of current time */
316     struct timeval   tmp_time;           /* used for time conversion */
317
318     char             **updvals;
319     int              schedule_smooth = 0;
320         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
321                                          /* a vector of future Holt-Winters seasonal coefs */
322     unsigned long    elapsed_pdp_st;
323                                          /* number of elapsed PDP steps since last update */
324     unsigned long    *rra_step_cnt = NULL;
325                                          /* number of rows to be updated in an RRA for a data
326                                           * value. */
327     unsigned long    start_pdp_offset;
328                                          /* number of PDP steps since the last update that
329                                           * are assigned to the first CDP to be generated
330                                           * since the last update. */
331     unsigned short   scratch_idx;
332                                          /* index into the CDP scratch array */
333     enum cf_en       current_cf;
334                                          /* numeric id of the current consolidation function */
335     rpnstack_t       rpnstack; /* used for COMPUTE DS */
336     int              version;  /* rrd version */
337     char             *endptr; /* used in the conversion */
338
339     rpnstack_init(&rpnstack);
340
341     /* need at least 1 arguments: data. */
342     if (argc < 1) {
343         rrd_set_error("Not enough arguments");
344         return -1;
345     }
346     
347     
348
349     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
350         return -1;
351     }
352     /* initialize time */
353     version = atoi(rrd.stat_head->version);
354     gettimeofday(&tmp_time, 0);
355     normalize_time(&tmp_time);
356     current_time = tmp_time.tv_sec;
357     if(version >= 3) {
358         current_time_usec = tmp_time.tv_usec;
359     }
360     else {
361         current_time_usec = 0;
362     }
363
364     rra_current = rra_start = rra_begin = ftell(rrd_file);
365     /* This is defined in the ANSI C standard, section 7.9.5.3:
366
367         When a file is opened with udpate mode ('+' as the second
368         or third character in the ... list of mode argument
369         variables), both input and ouptut may be performed on the
370         associated stream.  However, ...  input may not be directly
371         followed by output without an intervening call to a file
372         positioning function, unless the input oepration encounters
373         end-of-file. */
374     fseek(rrd_file, 0, SEEK_CUR);
375
376     
377     /* get exclusive lock to whole file.
378      * lock gets removed when we close the file.
379      */
380     if (LockRRD(rrd_file) != 0) {
381       rrd_set_error("could not lock RRD");
382       rrd_free(&rrd);
383       fclose(rrd_file);
384       return(-1);   
385     } 
386
387     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
388         rrd_set_error("allocating updvals pointer array");
389         rrd_free(&rrd);
390         fclose(rrd_file);
391         return(-1);
392     }
393
394     if ((pdp_temp = malloc(sizeof(rrd_value_t)
395                            *rrd.stat_head->ds_cnt))==NULL){
396         rrd_set_error("allocating pdp_temp ...");
397         free(updvals);
398         rrd_free(&rrd);
399         fclose(rrd_file);
400         return(-1);
401     }
402
403     if ((tmpl_idx = malloc(sizeof(unsigned long)
404                            *(rrd.stat_head->ds_cnt+1)))==NULL){
405         rrd_set_error("allocating tmpl_idx ...");
406         free(pdp_temp);
407         free(updvals);
408         rrd_free(&rrd);
409         fclose(rrd_file);
410         return(-1);
411     }
412     /* initialize template redirector */
413     /* default config example (assume DS 1 is a CDEF DS)
414        tmpl_idx[0] -> 0; (time)
415        tmpl_idx[1] -> 1; (DS 0)
416        tmpl_idx[2] -> 3; (DS 2)
417        tmpl_idx[3] -> 4; (DS 3) */
418     tmpl_idx[0] = 0; /* time */
419     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
420         {
421            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
422               tmpl_idx[ii++]=i;
423         }
424     tmpl_cnt= ii;
425
426     if (template) {
427         char *dsname;
428         unsigned int tmpl_len;
429         dsname = template;
430         tmpl_cnt = 1; /* the first entry is the time */
431         tmpl_len = strlen(template);
432         for(i=0;i<=tmpl_len ;i++) {
433             if (template[i] == ':' || template[i] == '\0') {
434                 template[i] = '\0';
435                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
436                     rrd_set_error("Template contains more DS definitions than RRD");
437                     free(updvals); free(pdp_temp);
438                     free(tmpl_idx); rrd_free(&rrd);
439                     fclose(rrd_file); return(-1);
440                 }
441                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
442                     rrd_set_error("unknown DS name '%s'",dsname);
443                     free(updvals); free(pdp_temp);
444                     free(tmpl_idx); rrd_free(&rrd);
445                     fclose(rrd_file); return(-1);
446                 } else {
447                   /* the first element is always the time */
448                   tmpl_idx[tmpl_cnt-1]++; 
449                   /* go to the next entry on the template */
450                   dsname = &template[i+1];
451                   /* fix the damage we did before */
452                   if (i<tmpl_len) {
453                      template[i]=':';
454                   } 
455
456                 }
457             }       
458         }
459     }
460     if ((pdp_new = malloc(sizeof(rrd_value_t)
461                           *rrd.stat_head->ds_cnt))==NULL){
462         rrd_set_error("allocating pdp_new ...");
463         free(updvals);
464         free(pdp_temp);
465         free(tmpl_idx);
466         rrd_free(&rrd);
467         fclose(rrd_file);
468         return(-1);
469     }
470
471     /* loop through the arguments. */
472     for(arg_i=0; arg_i<argc;arg_i++) {
473         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
474         char *step_start = stepper;
475         char *p;
476         char *parsetime_error = NULL;
477         enum {atstyle, normal} timesyntax;
478         struct rrd_time_value ds_tv;
479         if (stepper == NULL){
480                 rrd_set_error("failed duplication argv entry");
481                 free(updvals);
482                 free(pdp_temp);  
483                 free(tmpl_idx);
484                 rrd_free(&rrd);
485                 fclose(rrd_file);
486                 return(-1);
487          }
488         /* initialize all ds input to unknown except the first one
489            which has always got to be set */
490         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
491         strcpy(stepper,argv[arg_i]);
492         updvals[0]=stepper;
493         /* separate all ds elements; first must be examined separately
494            due to alternate time syntax */
495         if ((p=strchr(stepper,'@'))!=NULL) {
496             timesyntax = atstyle;
497             *p = '\0';
498             stepper = p+1;
499         } else if ((p=strchr(stepper,':'))!=NULL) {
500             timesyntax = normal;
501             *p = '\0';
502             stepper = p+1;
503         } else {
504             rrd_set_error("expected timestamp not found in data source from %s:...",
505                           argv[arg_i]);
506             free(step_start);
507             break;
508         }
509         ii=1;
510         updvals[tmpl_idx[ii]] = stepper;
511         while (*stepper) {
512             if (*stepper == ':') {
513                 *stepper = '\0';
514                 ii++;
515                 if (ii<tmpl_cnt){                   
516                     updvals[tmpl_idx[ii]] = stepper+1;
517                 }
518             }
519             stepper++;
520         }
521
522         if (ii != tmpl_cnt-1) {
523             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
524                           tmpl_cnt-1, ii, argv[arg_i]);
525             free(step_start);
526             break;
527         }
528         
529         /* get the time from the reading ... handle N */
530         if (timesyntax == atstyle) {
531             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
532                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
533                 free(step_start);
534                 break;
535             }
536             if (ds_tv.type == RELATIVE_TO_END_TIME ||
537                 ds_tv.type == RELATIVE_TO_START_TIME) {
538                 rrd_set_error("specifying time relative to the 'start' "
539                               "or 'end' makes no sense here: %s",
540                               updvals[0]);
541                 free(step_start);
542                 break;
543             }
544
545             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
546             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
547             
548         } else if (strcmp(updvals[0],"N")==0){
549             gettimeofday(&tmp_time, 0);
550             normalize_time(&tmp_time);
551             current_time = tmp_time.tv_sec;
552             current_time_usec = tmp_time.tv_usec;
553         } else {
554             double tmp;
555             tmp = strtod(updvals[0], 0);
556             current_time = floor(tmp);
557             current_time_usec = (long)((tmp - current_time) * 1000000L);
558         }
559         /* dont do any correction for old version RRDs */
560         if(version < 3) 
561             current_time_usec = 0;
562         
563         if(current_time <= rrd.live_head->last_up){
564             rrd_set_error("illegal attempt to update using time %ld when "
565                           "last update time is %ld (minimum one second step)",
566                           current_time, rrd.live_head->last_up);
567             free(step_start);
568             break;
569         }
570         
571         
572         /* seek to the beginning of the rra's */
573         if (rra_current != rra_begin) {
574             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
575                 rrd_set_error("seek error in rrd");
576                 free(step_start);
577                 break;
578             }
579             rra_current = rra_begin;
580         }
581         rra_start = rra_begin;
582
583         /* when was the current pdp started */
584         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
585         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
586
587         /* when did the last pdp_st occur */
588         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
589         occu_pdp_st = current_time - occu_pdp_age;
590         /* interval = current_time - rrd.live_head->last_up; */
591         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
592     
593         if (occu_pdp_st > proc_pdp_st){
594             /* OK we passed the pdp_st moment*/
595             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
596                                                               * occurred before the latest
597                                                               * pdp_st moment*/
598             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
599             post_int = occu_pdp_age;                         /* how much after it */
600             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
601         } else {
602             pre_int = interval;
603             post_int = 0;
604         }
605
606 #ifdef DEBUG
607         printf(
608                "proc_pdp_age %lu\t"
609                "proc_pdp_st %lu\t" 
610                "occu_pfp_age %lu\t" 
611                "occu_pdp_st %lu\t"
612                "int %lf\t"
613                "pre_int %lf\t"
614                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
615                 occu_pdp_age, occu_pdp_st,
616                interval, pre_int, post_int);
617 #endif
618     
619         /* process the data sources and update the pdp_prep 
620          * area accordingly */
621         for(i=0;i<rrd.stat_head->ds_cnt;i++){
622             enum dst_en dst_idx;
623             dst_idx= dst_conv(rrd.ds_def[i].dst);
624                 /* NOTE: DST_CDEF should never enter this if block, because
625                  * updvals[i+1][0] is initialized to 'U'; unless the caller
626                  * accidently specified a value for the DST_CDEF. To handle 
627                  * this case, an extra check is required. */
628             if((updvals[i+1][0] != 'U') &&
629                    (dst_idx != DST_CDEF) &&
630                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
631                double rate = DNAN;
632                /* the data source type defines how to process the data */
633                 /* pdp_new contains rate * time ... eg the bytes
634                  * transferred during the interval. Doing it this way saves
635                  * a lot of math operations */
636                 
637
638                 switch(dst_idx){
639                 case DST_COUNTER:
640                 case DST_DERIVE:
641                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
642                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
643                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
644                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
645                                  break;
646                             }
647                        }
648                        if (rrd_test_error()){
649                             break;
650                        }
651                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
652                        if(dst_idx == DST_COUNTER) {
653                           /* simple overflow catcher suggested by Andres Kroonmaa */
654                           /* this will fail terribly for non 32 or 64 bit counters ... */
655                           /* are there any others in SNMP land ? */
656                           if (pdp_new[i] < (double)0.0 ) 
657                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
658                           if (pdp_new[i] < (double)0.0 ) 
659                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
660                        }
661                        rate = pdp_new[i] / interval;
662                     }
663                    else {
664                      pdp_new[i]= DNAN;          
665                    }
666                    break;
667                 case DST_ABSOLUTE:
668                     errno = 0;
669                     pdp_new[i] = strtod(updvals[i+1],&endptr);
670                     if (errno > 0){
671                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
672                         break;
673                     };
674                     if (endptr[0] != '\0'){
675                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
676                         break;
677                     }
678                     rate = pdp_new[i] / interval;                 
679                     break;
680                 case DST_GAUGE:
681                     errno = 0;
682                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
683                     if (errno > 0){
684                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
685                         break;
686                     };
687                     if (endptr[0] != '\0'){
688                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
689                         break;
690                     }
691                     rate = pdp_new[i] / interval;                  
692                     break;
693                 default:
694                     rrd_set_error("rrd contains unknown DS type : '%s'",
695                                   rrd.ds_def[i].dst);
696                     break;
697                 }
698                 /* break out of this for loop if the error string is set */
699                 if (rrd_test_error()){
700                     break;
701                 }
702                /* make sure pdp_temp is neither too large or too small
703                 * if any of these occur it becomes unknown ...
704                 * sorry folks ... */
705                if ( ! isnan(rate) && 
706                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
707                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
708                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
709                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
710                   pdp_new[i] = DNAN;
711                }               
712             } else {
713                 /* no news is news all the same */
714                 pdp_new[i] = DNAN;
715             }
716             
717             /* make a copy of the command line argument for the next run */
718 #ifdef DEBUG
719             fprintf(stderr,
720                     "prep ds[%lu]\t"
721                     "last_arg '%s'\t"
722                     "this_arg '%s'\t"
723                     "pdp_new %10.2f\n",
724                     i,
725                     rrd.pdp_prep[i].last_ds,
726                     updvals[i+1], pdp_new[i]);
727 #endif
728             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
729                 strncpy(rrd.pdp_prep[i].last_ds,
730                         updvals[i+1],LAST_DS_LEN-1);
731                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
732             }
733         }
734         /* break out of the argument parsing loop if the error_string is set */
735         if (rrd_test_error()){
736             free(step_start);
737             break;
738         }
739         /* has a pdp_st moment occurred since the last run ? */
740
741         if (proc_pdp_st == occu_pdp_st){
742             /* no we have not passed a pdp_st moment. therefore update is simple */
743
744             for(i=0;i<rrd.stat_head->ds_cnt;i++){
745                 if(isnan(pdp_new[i]))
746                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
747                 else
748                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
749 #ifdef DEBUG
750                 fprintf(stderr,
751                         "NO PDP  ds[%lu]\t"
752                         "value %10.2f\t"
753                         "unkn_sec %5lu\n",
754                         i,
755                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
756                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
757 #endif
758             }   
759         } else {
760             /* an pdp_st has occurred. */
761
762             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
763              * occurred up to the last run.        
764             pdp_new[] contains rate*seconds from the latest run.
765             pdp_temp[] will contain the rate for cdp */
766
767             for(i=0;i<rrd.stat_head->ds_cnt;i++){
768                 /* update pdp_prep to the current pdp_st */
769                 if(isnan(pdp_new[i]))
770                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
771                 else
772                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
773                         pdp_new[i]/(double)interval*(double)pre_int;
774
775                 /* if too much of the pdp_prep is unknown we dump it */
776                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
777                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
778                     (occu_pdp_st-proc_pdp_st <= 
779                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
780                     pdp_temp[i] = DNAN;
781                 } else {
782                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
783                         / (double)( occu_pdp_st
784                                    - proc_pdp_st
785                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
786                 }
787
788                 /* process CDEF data sources; remember each CDEF DS can
789                  * only reference other DS with a lower index number */
790             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
791                    rpnp_t *rpnp;
792                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
793                    /* substitue data values for OP_VARIABLE nodes */
794                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
795                    {
796                           if (rpnp[ii].op == OP_VARIABLE) {
797                                  rpnp[ii].op = OP_NUMBER;
798                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
799                           }
800                    }
801                    /* run the rpn calculator */
802                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
803                           free(rpnp);
804                           break; /* exits the data sources pdp_temp loop */
805                    }
806                 }
807         
808                 /* make pdp_prep ready for the next run */
809                 if(isnan(pdp_new[i])){
810                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
811                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
812                 } else {
813                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
814                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
815                         pdp_new[i]/(double)interval*(double)post_int;
816                 }
817
818 #ifdef DEBUG
819                 fprintf(stderr,
820                         "PDP UPD ds[%lu]\t"
821                         "pdp_temp %10.2f\t"
822                         "new_prep %10.2f\t"
823                         "new_unkn_sec %5lu\n",
824                         i, pdp_temp[i],
825                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
826                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
827 #endif
828             }
829
830                 /* if there were errors during the last loop, bail out here */
831             if (rrd_test_error()){
832                free(step_start);
833                break;
834             }
835
836                 /* compute the number of elapsed pdp_st moments */
837                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
838 #ifdef DEBUG
839                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
840 #endif
841                 if (rra_step_cnt == NULL)
842                 {
843                    rra_step_cnt = (unsigned long *) 
844                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
845                 }
846
847             for(i = 0, rra_start = rra_begin;
848                 i < rrd.stat_head->rra_cnt;
849             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
850                 i++)
851                 {
852                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
853                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
854                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
855         if (start_pdp_offset <= elapsed_pdp_st) {
856            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
857                       rrd.rra_def[i].pdp_cnt + 1;
858             } else {
859                    rra_step_cnt[i] = 0;
860                 }
861
862                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
863                 {
864                    /* If this is a bulk update, we need to skip ahead in the seasonal
865                         * arrays so that they will be correct for the next observed value;
866                         * note that for the bulk update itself, no update will occur to
867                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
868                         * be set to DNAN. */
869            if (rra_step_cnt[i] > 2) 
870                    {
871                           /* skip update by resetting rra_step_cnt[i],
872                            * note that this is not data source specific; this is due
873                            * to the bulk update, not a DNAN value for the specific data
874                            * source. */
875                           rra_step_cnt[i] = 0;
876               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
877                              &last_seasonal_coef);
878                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
879                              &seasonal_coef);
880                    }
881                 
882                   /* periodically run a smoother for seasonal effects */
883                   /* Need to use first cdp parameter buffer to track
884                    * burnin (burnin requires a specific smoothing schedule).
885                    * The CDP_init_seasonal parameter is really an RRA level,
886                    * not a data source within RRA level parameter, but the rra_def
887                    * is read only for rrd_update (not flushed to disk). */
888                   iii = i*(rrd.stat_head -> ds_cnt);
889                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
890                           <= BURNIN_CYCLES)
891                   {
892                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
893                                  > rrd.rra_def[i].row_cnt - 1) {
894                            /* mark off one of the burnin cycles */
895                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
896                        schedule_smooth = 1;
897                          }  
898                   } else {
899                          /* someone has no doubt invented a trick to deal with this
900                           * wrap around, but at least this code is clear. */
901                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
902                              rrd.rra_ptr[i].cur_row)
903                          {
904                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
905                                   * mapping between PDP and CDP */
906                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
907                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
908                                  {
909 #ifdef DEBUG
910                                         fprintf(stderr,
911                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
912                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
913                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
914 #endif
915                                         schedule_smooth = 1;
916                                  }
917              } else {
918                                  /* can't rely on negative numbers because we are working with
919                                   * unsigned values */
920                                  /* Don't need modulus here. If we've wrapped more than once, only
921                                   * one smooth is executed at the end. */
922                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
923                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
924                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
925                                  {
926 #ifdef DEBUG
927                                         fprintf(stderr,
928                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
929                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
930                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
931 #endif
932                                         schedule_smooth = 1;
933                                  }
934                          }
935                   }
936
937               rra_current = ftell(rrd_file); 
938                 } /* if cf is DEVSEASONAL or SEASONAL */
939
940         if (rrd_test_error()) break;
941
942                     /* update CDP_PREP areas */
943                     /* loop over data soures within each RRA */
944                     for(ii = 0;
945                         ii < rrd.stat_head->ds_cnt;
946                         ii++)
947                         {
948                         
949                         /* iii indexes the CDP prep area for this data source within the RRA */
950                         iii=i*rrd.stat_head->ds_cnt+ii;
951
952                         if (rrd.rra_def[i].pdp_cnt > 1) {
953                           
954                            if (rra_step_cnt[i] > 0) {
955                            /* If we are in this block, as least 1 CDP value will be written to
956                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
957                                 * to be written, then the "fill in" value is the CDP_secondary_val
958                                 * entry. */
959                                   if (isnan(pdp_temp[ii]))
960                   {
961                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
962                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
963                                   } else {
964                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
965                                           * CDP data entries. No matter the CF, the value is the same because
966                                           * the average, max, min, and last of a list of identical values is
967                                           * the same, namely, the value itself. */
968                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
969                                   }
970                      
971                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
972                                       > rrd.rra_def[i].pdp_cnt*
973                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
974                                   {
975                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
976                                          /* initialize carry over */
977                                          if (current_cf == CF_AVERAGE) {
978                                                    if (isnan(pdp_temp[ii])) { 
979                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
980                                                    } else {
981                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
982                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
983                                                    }
984                                          } else {
985                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
986                                          }
987                                   } else {
988                                          rrd_value_t cum_val, cur_val; 
989                                      switch (current_cf) {
990                                                 case CF_AVERAGE:
991                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
992                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
993                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
994                                                (cum_val + cur_val * start_pdp_offset) /
995                                            (rrd.rra_def[i].pdp_cnt
996                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
997                                                    /* initialize carry over value */
998                                                    if (isnan(pdp_temp[ii])) { 
999                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1000                                                    } else {
1001                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1002                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1003                                                    }
1004                                                    break;
1005                                                 case CF_MAXIMUM:
1006                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1007                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1008 #ifdef DEBUG
1009                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1010                                                           isnan(pdp_temp[ii])) {
1011                                                      fprintf(stderr,
1012                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1013                                                                 i,ii);
1014                                                          exit(-1);
1015                                                   }
1016 #endif
1017                                                   if (cur_val > cum_val)
1018                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1019                                                   else
1020                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1021                                                   /* initialize carry over value */
1022                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1023                                                   break;
1024                                                 case CF_MINIMUM:
1025                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1026                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1027 #ifdef DEBUG
1028                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1029                                                           isnan(pdp_temp[ii])) {
1030                                                      fprintf(stderr,
1031                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1032                                                                 i,ii);
1033                                                          exit(-1);
1034                                                   }
1035 #endif
1036                                                   if (cur_val < cum_val)
1037                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1038                                                   else
1039                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1040                                                   /* initialize carry over value */
1041                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1042                                                   break;
1043                                                 case CF_LAST:
1044                                                 default:
1045                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1046                                                    /* initialize carry over value */
1047                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1048                                                 break;
1049                                          }
1050                                   } /* endif meets xff value requirement for a valid value */
1051                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1052                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1053                                   if (isnan(pdp_temp[ii]))
1054                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1055                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1056                                   else
1057                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1058                } else  /* rra_step_cnt[i]  == 0 */
1059                            {
1060 #ifdef DEBUG
1061                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1062                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1063                                          i,ii);
1064                                   } else {
1065                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1066                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1067                                   }
1068 #endif
1069                                   if (isnan(pdp_temp[ii])) {
1070                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1071                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1072                                   {
1073                                          if (current_cf == CF_AVERAGE) {
1074                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1075                                                    elapsed_pdp_st;
1076                                          } else {
1077                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1078                                          }
1079 #ifdef DEBUG
1080                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1081                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1082 #endif
1083                                   } else {
1084                                          switch (current_cf) {
1085                                          case CF_AVERAGE:
1086                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1087                                                    elapsed_pdp_st;
1088                                                 break;
1089                                          case CF_MINIMUM:
1090                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1091                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1092                                                 break; 
1093                                          case CF_MAXIMUM:
1094                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1095                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1096                                                 break; 
1097                                          case CF_LAST:
1098                                          default:
1099                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1100                                                 break;
1101                                          }
1102                                   }
1103                            }
1104                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1105                            if (elapsed_pdp_st > 2)
1106                            {
1107                                    switch (current_cf) {
1108                                    case CF_AVERAGE:
1109                                    default:
1110                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1111                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1112                                           break;
1113                    case CF_SEASONAL:
1114                                    case CF_DEVSEASONAL:
1115                                           /* need to update cached seasonal values, so they are consistent
1116                                            * with the bulk update */
1117                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1118                                            * CDP_last_deviation are the same. */
1119                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1120                                                  last_seasonal_coef[ii];
1121                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1122                                                  seasonal_coef[ii];
1123                                           break;
1124                    case CF_HWPREDICT:
1125                                           /* need to update the null_count and last_null_count.
1126                                            * even do this for non-DNAN pdp_temp because the
1127                                            * algorithm is not learning from batch updates. */
1128                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1129                                                  elapsed_pdp_st;
1130                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1131                                                  elapsed_pdp_st - 1;
1132                                           /* fall through */
1133                                    case CF_DEVPREDICT:
1134                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1135                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1136                                           break;
1137                    case CF_FAILURES:
1138                                           /* do not count missed bulk values as failures */
1139                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1140                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1141                                           /* need to reset violations buffer.
1142                                            * could do this more carefully, but for now, just
1143                                            * assume a bulk update wipes away all violations. */
1144                       erase_violations(&rrd, iii, i);
1145                                           break;
1146                                    }
1147                            } 
1148                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1149
1150                         if (rrd_test_error()) break;
1151
1152                         } /* endif data sources loop */
1153         } /* end RRA Loop */
1154
1155                 /* this loop is only entered if elapsed_pdp_st < 3 */
1156                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1157                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1158                 {
1159                for(i = 0, rra_start = rra_begin;
1160                    i < rrd.stat_head->rra_cnt;
1161                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1162                    i++)
1163                    {
1164                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1165
1166                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1167                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1168                           {
1169                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1170                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1171                                 &seasonal_coef);
1172                  rra_current = ftell(rrd_file);
1173                           }
1174                           if (rrd_test_error()) break;
1175                       /* loop over data soures within each RRA */
1176                       for(ii = 0;
1177                           ii < rrd.stat_head->ds_cnt;
1178                           ii++)
1179                           {
1180                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1181                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1182                                     scratch_idx, seasonal_coef);
1183                           }
1184            } /* end RRA Loop */
1185                    if (rrd_test_error()) break;
1186             } /* end elapsed_pdp_st loop */
1187
1188                 if (rrd_test_error()) break;
1189
1190                 /* Ready to write to disk */
1191                 /* Move sequentially through the file, writing one RRA at a time.
1192                  * Note this architecture divorces the computation of CDP with
1193                  * flushing updated RRA entries to disk. */
1194             for(i = 0, rra_start = rra_begin;
1195                 i < rrd.stat_head->rra_cnt;
1196             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1197                 i++) {
1198                 /* is there anything to write for this RRA? If not, continue. */
1199         if (rra_step_cnt[i] == 0) continue;
1200
1201                 /* write the first row */
1202 #ifdef DEBUG
1203         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1204 #endif
1205             rrd.rra_ptr[i].cur_row++;
1206             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1207                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1208                 /* positition on the first row */
1209                 rra_pos_tmp = rra_start +
1210                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1211                 if(rra_pos_tmp != rra_current) {
1212                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1213                       rrd_set_error("seek error in rrd");
1214                       break;
1215                    }
1216                    rra_current = rra_pos_tmp;
1217                 }
1218
1219 #ifdef DEBUG
1220             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1221 #endif
1222                 scratch_idx = CDP_primary_val;
1223                 if (pcdp_summary != NULL)
1224                 {
1225                    rra_time = (current_time - current_time 
1226                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1227                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1228                 }
1229                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1230                    pcdp_summary, &rra_time);
1231                 if (rrd_test_error()) break;
1232
1233                 /* write other rows of the bulk update, if any */
1234                 scratch_idx = CDP_secondary_val;
1235                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1236                 {
1237                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1238                    {
1239 #ifdef DEBUG
1240               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1241                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1242 #endif
1243                           /* wrap */
1244                           rrd.rra_ptr[i].cur_row = 0;
1245                           /* seek back to beginning of current rra */
1246                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1247                           {
1248                          rrd_set_error("seek error in rrd");
1249                          break;
1250                           }
1251 #ifdef DEBUG
1252                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1253 #endif
1254                           rra_current = rra_start;
1255                    }
1256                    if (pcdp_summary != NULL)
1257                    {
1258                       rra_time = (current_time - current_time 
1259                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1260                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1261                    }
1262                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1263                       pcdp_summary, &rra_time);
1264                 }
1265                 
1266                 if (rrd_test_error())
1267                   break;
1268                 } /* RRA LOOP */
1269
1270             /* break out of the argument parsing loop if error_string is set */
1271             if (rrd_test_error()){
1272                    free(step_start);
1273                    break;
1274             } 
1275             
1276         } /* endif a pdp_st has occurred */ 
1277         rrd.live_head->last_up = current_time;
1278         rrd.live_head->last_up_usec = current_time_usec; 
1279         free(step_start);
1280     } /* function argument loop */
1281
1282     if (seasonal_coef != NULL) free(seasonal_coef);
1283     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1284         if (rra_step_cnt != NULL) free(rra_step_cnt);
1285     rpnstack_free(&rpnstack);
1286
1287     /* if we got here and if there is an error and if the file has not been
1288      * written to, then close things up and return. */
1289     if (rrd_test_error()) {
1290         free(updvals);
1291         free(tmpl_idx);
1292         rrd_free(&rrd);
1293         free(pdp_temp);
1294         free(pdp_new);
1295         fclose(rrd_file);
1296         return(-1);
1297     }
1298
1299     /* aargh ... that was tough ... so many loops ... anyway, its done.
1300      * we just need to write back the live header portion now*/
1301
1302     if (fseek(rrd_file, (sizeof(stat_head_t)
1303                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1304                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1305               SEEK_SET) != 0) {
1306         rrd_set_error("seek rrd for live header writeback");
1307         free(updvals);
1308         free(tmpl_idx);
1309         rrd_free(&rrd);
1310         free(pdp_temp);
1311         free(pdp_new);
1312         fclose(rrd_file);
1313         return(-1);
1314     }
1315
1316     if(version >= 3) {
1317             if(fwrite( rrd.live_head,
1318                        sizeof(live_head_t), 1, rrd_file) != 1){
1319                 rrd_set_error("fwrite live_head to rrd");
1320                 free(updvals);
1321                 rrd_free(&rrd);
1322                 free(tmpl_idx);
1323                 free(pdp_temp);
1324                 free(pdp_new);
1325                 fclose(rrd_file);
1326                 return(-1);
1327             }
1328     }
1329     else {
1330             if(fwrite( &rrd.live_head->last_up,
1331                        sizeof(time_t), 1, rrd_file) != 1){
1332                 rrd_set_error("fwrite live_head to rrd");
1333                 free(updvals);
1334                 rrd_free(&rrd);
1335                 free(tmpl_idx);
1336                 free(pdp_temp);
1337                 free(pdp_new);
1338                 fclose(rrd_file);
1339                 return(-1);
1340             }
1341     }
1342             
1343
1344     if(fwrite( rrd.pdp_prep,
1345                sizeof(pdp_prep_t),
1346                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1347         rrd_set_error("ftwrite pdp_prep to rrd");
1348         free(updvals);
1349         rrd_free(&rrd);
1350         free(tmpl_idx);
1351         free(pdp_temp);
1352         free(pdp_new);
1353         fclose(rrd_file);
1354         return(-1);
1355     }
1356
1357     if(fwrite( rrd.cdp_prep,
1358                sizeof(cdp_prep_t),
1359                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1360        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1361
1362         rrd_set_error("ftwrite cdp_prep to rrd");
1363         free(updvals);
1364         free(tmpl_idx);
1365         rrd_free(&rrd);
1366         free(pdp_temp);
1367         free(pdp_new);
1368         fclose(rrd_file);
1369         return(-1);
1370     }
1371
1372     if(fwrite( rrd.rra_ptr,
1373                sizeof(rra_ptr_t), 
1374                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1375         rrd_set_error("fwrite rra_ptr to rrd");
1376         free(updvals);
1377         free(tmpl_idx);
1378         rrd_free(&rrd);
1379         free(pdp_temp);
1380         free(pdp_new);
1381         fclose(rrd_file);
1382         return(-1);
1383     }
1384
1385     /* OK now close the files and free the memory */
1386     if(fclose(rrd_file) != 0){
1387         rrd_set_error("closing rrd");
1388         free(updvals);
1389         free(tmpl_idx);
1390         rrd_free(&rrd);
1391         free(pdp_temp);
1392         free(pdp_new);
1393         return(-1);
1394     }
1395
1396     /* calling the smoothing code here guarantees at most
1397          * one smoothing operation per rrd_update call. Unfortunately,
1398          * it is possible with bulk updates, or a long-delayed update
1399          * for smoothing to occur off-schedule. This really isn't
1400          * critical except during the burning cycles. */
1401         if (schedule_smooth)
1402         {
1403 #ifndef WIN32
1404           rrd_file = fopen(filename,"r+");
1405 #else
1406           rrd_file = fopen(filename,"rb+");
1407 #endif
1408           rra_start = rra_begin;
1409           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1410           {
1411             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1412                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1413             {
1414 #ifdef DEBUG
1415               fprintf(stderr,"Running smoother for rra %ld\n",i);
1416 #endif
1417               apply_smoother(&rrd,i,rra_start,rrd_file);
1418               if (rrd_test_error())
1419                 break;
1420             }
1421             rra_start += rrd.rra_def[i].row_cnt
1422               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1423           }
1424           fclose(rrd_file);
1425         }
1426     rrd_free(&rrd);
1427     free(updvals);
1428     free(tmpl_idx);
1429     free(pdp_new);
1430     free(pdp_temp);
1431     return(0);
1432 }
1433
1434 /*
1435  * get exclusive lock to whole file.
1436  * lock gets removed when we close the file
1437  *
1438  * returns 0 on success
1439  */
1440 int
1441 LockRRD(FILE *rrdfile)
1442 {
1443     int rrd_fd;         /* File descriptor for RRD */
1444     int                 stat;
1445
1446     rrd_fd = fileno(rrdfile);
1447
1448         {
1449 #ifndef WIN32    
1450                 struct flock    lock;
1451     lock.l_type = F_WRLCK;    /* exclusive write lock */
1452     lock.l_len = 0;           /* whole file */
1453     lock.l_start = 0;         /* start of file */
1454     lock.l_whence = SEEK_SET;   /* end of file */
1455
1456     stat = fcntl(rrd_fd, F_SETLK, &lock);
1457 #else
1458                 struct _stat st;
1459
1460                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1461                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1462                 } else {
1463                         stat = -1;
1464                 }
1465 #endif
1466         }
1467
1468     return(stat);
1469 }
1470
1471
1472 info_t
1473 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1474                unsigned short CDP_scratch_idx, FILE *rrd_file,
1475                    info_t *pcdp_summary, time_t *rra_time)
1476 {
1477    unsigned long ds_idx, cdp_idx;
1478    infoval iv;
1479   
1480    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1481    {
1482       /* compute the cdp index */
1483       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1484 #ifdef DEBUG
1485           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1486              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1487              rrd -> rra_def[rra_idx].cf_nam);
1488 #endif 
1489       if (pcdp_summary != NULL)
1490           {
1491              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1492              /* append info to the return hash */
1493                  pcdp_summary = info_push(pcdp_summary,
1494                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1495                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1496                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1497          RD_I_VAL, iv);
1498           }
1499           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1500                  sizeof(rrd_value_t),1,rrd_file) != 1)
1501           { 
1502              rrd_set_error("writing rrd");
1503              return 0;
1504           }
1505           *rra_current += sizeof(rrd_value_t);
1506         }
1507         return (pcdp_summary);
1508 }