should not assigne but compare ... grrrrr
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.1.x  Copyright Tobias Oetiker, 1997 - 2002
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.12  2003/09/04 13:16:12  oetiker
9  * should not assigne but compare ... grrrrr
10  *
11  * Revision 1.11  2003/09/02 21:58:35  oetiker
12  * be pickier about what we accept in rrd_update. Complain if things do not work out
13  *
14  * Revision 1.10  2003/04/29 19:14:12  jake
15  * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
16  * Also revert accidental addition of -I to aclocal MakeMakefile.
17  *
18  * Revision 1.9  2003/04/25 18:35:08  jake
19  * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
20  *
21  * Revision 1.8  2003/03/31 21:22:12  oetiker
22  * enables RRDtool updates with microsecond or in case of windows millisecond
23  * precision. This is needed to reduce time measurement error when archive step
24  * is small. (<30s) --  Sasha Mikheev <sasha@avalon-net.co.il>
25  *
26  * Revision 1.7  2003/02/13 07:05:27  oetiker
27  * Find attached the patch I promised to send to you. Please note that there
28  * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
29  * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
30  * library is identical to librrd, but it contains support code for per-thread
31  * global variables currently used for error information only. This is similar
32  * to how errno per-thread variables are implemented.  librrd_th must be linked
33  * alongside of libpthred
34  *
35  * There is also a new file "THREADS", holding some documentation.
36  *
37  * -- Peter Stamfest <peter@stamfest.at>
38  *
39  * Revision 1.6  2002/02/01 20:34:49  oetiker
40  * fixed version number and date/time
41  *
42  * Revision 1.5  2001/05/09 05:31:01  oetiker
43  * Bug fix: when update of multiple PDP/CDP RRAs coincided
44  * with interpolation of multiple PDPs an incorrect value was
45  * stored as the CDP. Especially evident for GAUGE data sources.
46  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
47  *
48  * Revision 1.4  2001/03/10 23:54:41  oetiker
49  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
50  * parser and calculator from rrd_graph and puts then in a new file,
51  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
52  * clean-up of aberrant behavior stuff, including a bug fix.
53  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
54  * -- Jake Brutlag <jakeb@corp.webtv.net>
55  *
56  * Revision 1.3  2001/03/04 13:01:55  oetiker
57  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
58  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
59  * This is backwards compatible! But new files using the Aberrant stuff are not readable
60  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
61  * -- Jake Brutlag <jakeb@corp.webtv.net>
62  *
63  * Revision 1.2  2001/03/04 11:14:25  oetiker
64  * added at-style-time@value:value syntax to rrd_update
65  * --  Dave Bodenstab <imdave@mcs.net>
66  *
67  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
68  * checkin
69  *
70  *****************************************************************************/
71
72 #include "rrd_tool.h"
73 #include <sys/types.h>
74 #include <fcntl.h>
75
76 #ifdef WIN32
77  #include <sys/locking.h>
78  #include <sys/stat.h>
79  #include <io.h>
80 #endif
81
82 #include "rrd_hw.h"
83 #include "rrd_rpncalc.h"
84
85 #include "rrd_is_thread_safe.h"
86
87 #ifdef WIN32
88 /*
89  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
90  * replacement.
91  */
92 #include <sys/timeb.h>
93
94 struct timeval {
95         time_t tv_sec; /* seconds */
96         long tv_usec;  /* microseconds */
97 };
98
99 struct __timezone {
100         int  tz_minuteswest; /* minutes W of Greenwich */
101         int  tz_dsttime;     /* type of dst correction */
102 };
103
104 static gettimeofday(struct timeval *t, struct __timezone *tz) {
105         
106         struct timeb current_time;
107
108         _ftime(&current_time);
109         
110         t->tv_sec  = current_time.time;
111         t->tv_usec = current_time.millitm * 1000;
112 }
113
114 #endif
115 /*
116  * normilize time as returned by gettimeofday. usec part must
117  * be always >= 0
118  */
119 static void normalize_time(struct timeval *t)
120 {
121         if(t->tv_usec < 0) {
122                 t->tv_sec--;
123                 t->tv_usec += 1000000L;
124         }
125 }
126
127 /* Local prototypes */
128 int LockRRD(FILE *rrd_file);
129 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
130                                         unsigned long *rra_current,
131                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
132                                         info_t *pcdp_summary, time_t *rra_time);
133 int rrd_update_r(char *filename, char *template, int argc, char **argv);
134 int _rrd_update(char *filename, char *template, int argc, char **argv, 
135                                         info_t*);
136
137 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
138
139
140 #ifdef STANDALONE
141 int 
142 main(int argc, char **argv){
143         rrd_update(argc,argv);
144         if (rrd_test_error()) {
145                 printf("RRDtool 1.1.x  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
146                         "Usage: rrdupdate filename\n"
147                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
148                         "\t\t\ttime|N:value[:value...]\n\n"
149                         "\t\t\tat-time@value[:value...]\n\n"
150                         "\t\t\t[ time:value[:value...] ..]\n\n");
151                                    
152                 printf("ERROR: %s\n",rrd_get_error());
153                 rrd_clear_error();                                                            
154                 return 1;
155         }
156         return 0;
157 }
158 #endif
159
160 info_t *rrd_update_v(int argc, char **argv)
161 {
162     char             *template = NULL;          
163         info_t *result = NULL;
164         infoval rc;
165
166     while (1) {
167                 static struct option long_options[] =
168                         {
169                                 {"template",      required_argument, 0, 't'},
170                                 {0,0,0,0}
171                         };
172                 int option_index = 0;
173                 int opt;
174                 opt = getopt_long(argc, argv, "t:", 
175                                                   long_options, &option_index);
176                 
177                 if (opt == EOF)
178                         break;
179                 
180                 switch(opt) {
181                 case 't':
182                         template = optarg;
183                         break;
184                 
185                 case '?':
186                         rrd_set_error("unknown option '%s'",argv[optind-1]);
187             rc.u_int = -1;
188                         goto end_tag;
189                 }
190     }
191
192     /* need at least 2 arguments: filename, data. */
193     if (argc-optind < 2) {
194                 rrd_set_error("Not enough arguments");
195         rc.u_int = -1;
196                 goto end_tag;
197     }
198     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
199         rc.u_int = _rrd_update(argv[optind], template,
200                       argc - optind - 1, argv + optind + 1, result);
201     result->value.u_int = rc.u_int;
202 end_tag:
203     return result;
204 }
205
206 int
207 rrd_update(int argc, char **argv)
208 {
209     char             *template = NULL;          
210     int rc;
211
212     while (1) {
213                 static struct option long_options[] =
214                         {
215                                 {"template",      required_argument, 0, 't'},
216                                 {0,0,0,0}
217                         };
218                 int option_index = 0;
219                 int opt;
220                 opt = getopt_long(argc, argv, "t:", 
221                                                   long_options, &option_index);
222                 
223                 if (opt == EOF)
224                         break;
225                 
226                 switch(opt) {
227                 case 't':
228                         template = optarg;
229                         break;
230                 
231                 case '?':
232                         rrd_set_error("unknown option '%s'",argv[optind-1]);
233                         return(-1);
234                 }
235     }
236
237     /* need at least 2 arguments: filename, data. */
238     if (argc-optind < 2) {
239                 rrd_set_error("Not enough arguments");
240
241                 return -1;
242     }
243  
244         rc = rrd_update_r(argv[optind], template,
245                       argc - optind - 1, argv + optind + 1);
246     return rc;
247 }
248
249 int
250 rrd_update_r(char *filename, char *template, int argc, char **argv)
251 {
252    return _rrd_update(filename, template, argc, argv, NULL);
253 }
254
255 int
256 _rrd_update(char *filename, char *template, int argc, char **argv, 
257    info_t *pcdp_summary)
258 {
259
260     int              arg_i = 2;
261     short            j;
262     unsigned long    i,ii,iii=1;
263
264     unsigned long    rra_begin;          /* byte pointer to the rra
265                                           * area in the rrd file.  this
266                                           * pointer never changes value */
267     unsigned long    rra_start;          /* byte pointer to the rra
268                                           * area in the rrd file.  this
269                                           * pointer changes as each rrd is
270                                           * processed. */
271     unsigned long    rra_current;        /* byte pointer to the current write
272                                           * spot in the rrd file. */
273     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
274     double           interval,
275         pre_int,post_int;                /* interval between this and
276                                           * the last run */
277     unsigned long    proc_pdp_st;        /* which pdp_st was the last
278                                           * to be processed */
279     unsigned long    occu_pdp_st;        /* when was the pdp_st
280                                           * before the last update
281                                           * time */
282     unsigned long    proc_pdp_age;       /* how old was the data in
283                                           * the pdp prep area when it
284                                           * was last updated */
285     unsigned long    occu_pdp_age;       /* how long ago was the last
286                                           * pdp_step time */
287     rrd_value_t      *pdp_new;           /* prepare the incoming data
288                                           * to be added the the
289                                           * existing entry */
290     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
291                                           * to be added the the
292                                           * cdp values */
293
294     long             *tmpl_idx;          /* index representing the settings
295                                             transported by the template index */
296     unsigned long    tmpl_cnt = 2;       /* time and data */
297
298     FILE             *rrd_file;
299     rrd_t            rrd;
300     time_t           current_time;
301         time_t           rra_time; /* time of update for a RRA */
302     unsigned long    current_time_usec;  /* microseconds part of current time */
303     struct timeval   tmp_time;           /* used for time conversion */
304
305     char             **updvals;
306     int              schedule_smooth = 0;
307         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
308                                          /* a vector of future Holt-Winters seasonal coefs */
309     unsigned long    elapsed_pdp_st;
310                                          /* number of elapsed PDP steps since last update */
311     unsigned long    *rra_step_cnt = NULL;
312                                          /* number of rows to be updated in an RRA for a data
313                                           * value. */
314     unsigned long    start_pdp_offset;
315                                          /* number of PDP steps since the last update that
316                                           * are assigned to the first CDP to be generated
317                                           * since the last update. */
318     unsigned short   scratch_idx;
319                                          /* index into the CDP scratch array */
320     enum cf_en       current_cf;
321                                          /* numeric id of the current consolidation function */
322     rpnstack_t       rpnstack; /* used for COMPUTE DS */
323     int              version;  /* rrd version */
324     char             *endptr; /* used in the conversion */
325
326     rpnstack_init(&rpnstack);
327
328     /* need at least 1 arguments: data. */
329     if (argc < 1) {
330         rrd_set_error("Not enough arguments");
331         return -1;
332     }
333     
334     
335
336     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
337         return -1;
338     }
339     /* initialize time */
340     version = atoi(rrd.stat_head->version);
341     gettimeofday(&tmp_time, 0);
342     normalize_time(&tmp_time);
343     current_time = tmp_time.tv_sec;
344     if(version >= 3) {
345         current_time_usec = tmp_time.tv_usec;
346     }
347     else {
348         current_time_usec = 0;
349     }
350
351     rra_current = rra_start = rra_begin = ftell(rrd_file);
352     /* This is defined in the ANSI C standard, section 7.9.5.3:
353
354         When a file is opened with udpate mode ('+' as the second
355         or third character in the ... list of mode argument
356         variables), both input and ouptut may be performed on the
357         associated stream.  However, ...  input may not be directly
358         followed by output without an intervening call to a file
359         positioning function, unless the input oepration encounters
360         end-of-file. */
361     fseek(rrd_file, 0, SEEK_CUR);
362
363     
364     /* get exclusive lock to whole file.
365      * lock gets removed when we close the file.
366      */
367     if (LockRRD(rrd_file) != 0) {
368       rrd_set_error("could not lock RRD");
369       rrd_free(&rrd);
370       fclose(rrd_file);
371       return(-1);   
372     } 
373
374     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
375         rrd_set_error("allocating updvals pointer array");
376         rrd_free(&rrd);
377         fclose(rrd_file);
378         return(-1);
379     }
380
381     if ((pdp_temp = malloc(sizeof(rrd_value_t)
382                            *rrd.stat_head->ds_cnt))==NULL){
383         rrd_set_error("allocating pdp_temp ...");
384         free(updvals);
385         rrd_free(&rrd);
386         fclose(rrd_file);
387         return(-1);
388     }
389
390     if ((tmpl_idx = malloc(sizeof(unsigned long)
391                            *(rrd.stat_head->ds_cnt+1)))==NULL){
392         rrd_set_error("allocating tmpl_idx ...");
393         free(pdp_temp);
394         free(updvals);
395         rrd_free(&rrd);
396         fclose(rrd_file);
397         return(-1);
398     }
399     /* initialize template redirector */
400     /* default config example (assume DS 1 is a CDEF DS)
401        tmpl_idx[0] -> 0; (time)
402        tmpl_idx[1] -> 1; (DS 0)
403        tmpl_idx[2] -> 3; (DS 2)
404        tmpl_idx[3] -> 4; (DS 3) */
405     tmpl_idx[0] = 0; /* time */
406     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
407         {
408            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
409               tmpl_idx[ii++]=i;
410         }
411     tmpl_cnt= ii;
412
413     if (template) {
414         char *dsname;
415         unsigned int tmpl_len;
416         dsname = template;
417         tmpl_cnt = 1; /* the first entry is the time */
418         tmpl_len = strlen(template);
419         for(i=0;i<=tmpl_len ;i++) {
420             if (template[i] == ':' || template[i] == '\0') {
421                 template[i] = '\0';
422                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
423                     rrd_set_error("Template contains more DS definitions than RRD");
424                     free(updvals); free(pdp_temp);
425                     free(tmpl_idx); rrd_free(&rrd);
426                     fclose(rrd_file); return(-1);
427                 }
428                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
429                     rrd_set_error("unknown DS name '%s'",dsname);
430                     free(updvals); free(pdp_temp);
431                     free(tmpl_idx); rrd_free(&rrd);
432                     fclose(rrd_file); return(-1);
433                 } else {
434                   /* the first element is always the time */
435                   tmpl_idx[tmpl_cnt-1]++; 
436                   /* go to the next entry on the template */
437                   dsname = &template[i+1];
438                   /* fix the damage we did before */
439                   if (i<tmpl_len) {
440                      template[i]=':';
441                   } 
442
443                 }
444             }       
445         }
446     }
447     if ((pdp_new = malloc(sizeof(rrd_value_t)
448                           *rrd.stat_head->ds_cnt))==NULL){
449         rrd_set_error("allocating pdp_new ...");
450         free(updvals);
451         free(pdp_temp);
452         free(tmpl_idx);
453         rrd_free(&rrd);
454         fclose(rrd_file);
455         return(-1);
456     }
457
458     /* loop through the arguments. */
459     for(arg_i=0; arg_i<argc;arg_i++) {
460         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
461         char *step_start = stepper;
462         char *p;
463         char *parsetime_error = NULL;
464         enum {atstyle, normal} timesyntax;
465         struct time_value ds_tv;
466         if (stepper == NULL){
467                 rrd_set_error("failed duplication argv entry");
468                 free(updvals);
469                 free(pdp_temp);  
470                 free(tmpl_idx);
471                 rrd_free(&rrd);
472                 fclose(rrd_file);
473                 return(-1);
474          }
475         /* initialize all ds input to unknown except the first one
476            which has always got to be set */
477         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
478         strcpy(stepper,argv[arg_i]);
479         updvals[0]=stepper;
480         /* separate all ds elements; first must be examined separately
481            due to alternate time syntax */
482         if ((p=strchr(stepper,'@'))!=NULL) {
483             timesyntax = atstyle;
484             *p = '\0';
485             stepper = p+1;
486         } else if ((p=strchr(stepper,':'))!=NULL) {
487             timesyntax = normal;
488             *p = '\0';
489             stepper = p+1;
490         } else {
491             rrd_set_error("expected timestamp not found in data source from %s:...",
492                           argv[arg_i]);
493             free(step_start);
494             break;
495         }
496         ii=1;
497         updvals[tmpl_idx[ii]] = stepper;
498         while (*stepper) {
499             if (*stepper == ':') {
500                 *stepper = '\0';
501                 ii++;
502                 if (ii<tmpl_cnt){                   
503                     updvals[tmpl_idx[ii]] = stepper+1;
504                 }
505             }
506             stepper++;
507         }
508
509         if (ii != tmpl_cnt-1) {
510             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
511                           tmpl_cnt-1, ii, argv[arg_i]);
512             free(step_start);
513             break;
514         }
515         
516         /* get the time from the reading ... handle N */
517         if (timesyntax == atstyle) {
518             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
519                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
520                 free(step_start);
521                 break;
522             }
523             if (ds_tv.type == RELATIVE_TO_END_TIME ||
524                 ds_tv.type == RELATIVE_TO_START_TIME) {
525                 rrd_set_error("specifying time relative to the 'start' "
526                               "or 'end' makes no sense here: %s",
527                               updvals[0]);
528                 free(step_start);
529                 break;
530             }
531
532             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
533             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
534             
535         } else if (strcmp(updvals[0],"N")==0){
536             gettimeofday(&tmp_time, 0);
537             normalize_time(&tmp_time);
538             current_time = tmp_time.tv_sec;
539             current_time_usec = tmp_time.tv_usec;
540         } else {
541             double tmp;
542             tmp = strtod(updvals[0], 0);
543             current_time = floor(tmp);
544             current_time_usec = (long)((tmp - current_time) * 1000000L);
545         }
546         /* dont do any correction for old version RRDs */
547         if(version < 3) 
548             current_time_usec = 0;
549         
550         if(current_time <= rrd.live_head->last_up){
551             rrd_set_error("illegal attempt to update using time %ld when "
552                           "last update time is %ld (minimum one second step)",
553                           current_time, rrd.live_head->last_up);
554             free(step_start);
555             break;
556         }
557         
558         
559         /* seek to the beginning of the rra's */
560         if (rra_current != rra_begin) {
561             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
562                 rrd_set_error("seek error in rrd");
563                 free(step_start);
564                 break;
565             }
566             rra_current = rra_begin;
567         }
568         rra_start = rra_begin;
569
570         /* when was the current pdp started */
571         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
572         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
573
574         /* when did the last pdp_st occur */
575         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
576         occu_pdp_st = current_time - occu_pdp_age;
577         /* interval = current_time - rrd.live_head->last_up; */
578         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
579     
580         if (occu_pdp_st > proc_pdp_st){
581             /* OK we passed the pdp_st moment*/
582             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
583                                                               * occurred before the latest
584                                                               * pdp_st moment*/
585             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
586             post_int = occu_pdp_age;                         /* how much after it */
587             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
588         } else {
589             pre_int = interval;
590             post_int = 0;
591         }
592
593 #ifdef DEBUG
594         printf(
595                "proc_pdp_age %lu\t"
596                "proc_pdp_st %lu\t" 
597                "occu_pfp_age %lu\t" 
598                "occu_pdp_st %lu\t"
599                "int %lf\t"
600                "pre_int %lf\t"
601                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
602                 occu_pdp_age, occu_pdp_st,
603                interval, pre_int, post_int);
604 #endif
605     
606         /* process the data sources and update the pdp_prep 
607          * area accordingly */
608         for(i=0;i<rrd.stat_head->ds_cnt;i++){
609             enum dst_en dst_idx;
610             dst_idx= dst_conv(rrd.ds_def[i].dst);
611                 /* NOTE: DST_CDEF should never enter this if block, because
612                  * updvals[i+1][0] is initialized to 'U'; unless the caller
613                  * accidently specified a value for the DST_CDEF. To handle 
614                  * this case, an extra check is required. */
615             if((updvals[i+1][0] != 'U') &&
616                    (dst_idx != DST_CDEF) &&
617                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
618                double rate = DNAN;
619                /* the data source type defines how to process the data */
620                 /* pdp_new contains rate * time ... eg the bytes
621                  * transferred during the interval. Doing it this way saves
622                  * a lot of math operations */
623                 
624
625                 switch(dst_idx){
626                 case DST_COUNTER:
627                 case DST_DERIVE:
628                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
629                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
630                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
631                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
632                                  break;
633                             }
634                        }
635                        if (rrd_test_error()){
636                             break;
637                        }
638                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
639                        if(dst_idx == DST_COUNTER) {
640                           /* simple overflow catcher sugestet by andres kroonmaa */
641                           /* this will fail terribly for non 32 or 64 bit counters ... */
642                           /* are there any others in SNMP land ? */
643                           if (pdp_new[i] < (double)0.0 ) 
644                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
645                           if (pdp_new[i] < (double)0.0 ) 
646                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
647                        }
648                        rate = pdp_new[i] / interval;
649                     }
650                    else {
651                      pdp_new[i]= DNAN;          
652                    }
653                    break;
654                 case DST_ABSOLUTE:
655                     errno = 0;
656                     pdp_new[i] = strtod(updvals[i+1],&endptr);
657                     if (errno > 0){
658                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
659                         break;
660                     };
661                     if (endptr[0] != '\0'){
662                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
663                         break;
664                     }
665                     rate = pdp_new[i] / interval;                 
666                     break;
667                 case DST_GAUGE:
668                     errno = 0;
669                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
670                     if (errno > 0){
671                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
672                         break;
673                     };
674                     if (endptr[0] != '\0'){
675                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
676                         break;
677                     }
678                     rate = pdp_new[i] / interval;                  
679                     break;
680                 default:
681                     rrd_set_error("rrd contains unknown DS type : '%s'",
682                                   rrd.ds_def[i].dst);
683                     break;
684                 }
685                 /* break out of this for loop if the error string is set */
686                 if (rrd_test_error()){
687                     break;
688                 }
689                /* make sure pdp_temp is neither too large or too small
690                 * if any of these occur it becomes unknown ...
691                 * sorry folks ... */
692                if ( ! isnan(rate) && 
693                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
694                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
695                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
696                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
697                   pdp_new[i] = DNAN;
698                }               
699             } else {
700                 /* no news is news all the same */
701                 pdp_new[i] = DNAN;
702             }
703             
704             /* make a copy of the command line argument for the next run */
705 #ifdef DEBUG
706             fprintf(stderr,
707                     "prep ds[%lu]\t"
708                     "last_arg '%s'\t"
709                     "this_arg '%s'\t"
710                     "pdp_new %10.2f\n",
711                     i,
712                     rrd.pdp_prep[i].last_ds,
713                     updvals[i+1], pdp_new[i]);
714 #endif
715             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
716                 strncpy(rrd.pdp_prep[i].last_ds,
717                         updvals[i+1],LAST_DS_LEN-1);
718                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
719             }
720         }
721         /* break out of the argument parsing loop if the error_string is set */
722         if (rrd_test_error()){
723             free(step_start);
724             break;
725         }
726         /* has a pdp_st moment occurred since the last run ? */
727
728         if (proc_pdp_st == occu_pdp_st){
729             /* no we have not passed a pdp_st moment. therefore update is simple */
730
731             for(i=0;i<rrd.stat_head->ds_cnt;i++){
732                 if(isnan(pdp_new[i]))
733                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
734                 else
735                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
736 #ifdef DEBUG
737                 fprintf(stderr,
738                         "NO PDP  ds[%lu]\t"
739                         "value %10.2f\t"
740                         "unkn_sec %5lu\n",
741                         i,
742                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
743                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
744 #endif
745             }   
746         } else {
747             /* an pdp_st has occurred. */
748
749             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
750              * occurred up to the last run.        
751             pdp_new[] contains rate*seconds from the latest run.
752             pdp_temp[] will contain the rate for cdp */
753
754             for(i=0;i<rrd.stat_head->ds_cnt;i++){
755                 /* update pdp_prep to the current pdp_st */
756                 if(isnan(pdp_new[i]))
757                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
758                 else
759                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
760                         pdp_new[i]/(double)interval*(double)pre_int;
761
762                 /* if too much of the pdp_prep is unknown we dump it */
763                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
764                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
765                     (occu_pdp_st-proc_pdp_st <= 
766                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
767                     pdp_temp[i] = DNAN;
768                 } else {
769                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
770                         / (double)( occu_pdp_st
771                                    - proc_pdp_st
772                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
773                 }
774
775                 /* process CDEF data sources; remember each CDEF DS can
776                  * only reference other DS with a lower index number */
777             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
778                    rpnp_t *rpnp;
779                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
780                    /* substitue data values for OP_VARIABLE nodes */
781                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
782                    {
783                           if (rpnp[ii].op == OP_VARIABLE) {
784                                  rpnp[ii].op = OP_NUMBER;
785                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
786                           }
787                    }
788                    /* run the rpn calculator */
789                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
790                           free(rpnp);
791                           break; /* exits the data sources pdp_temp loop */
792                    }
793                 }
794         
795                 /* make pdp_prep ready for the next run */
796                 if(isnan(pdp_new[i])){
797                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
798                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
799                 } else {
800                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
801                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
802                         pdp_new[i]/(double)interval*(double)post_int;
803                 }
804
805 #ifdef DEBUG
806                 fprintf(stderr,
807                         "PDP UPD ds[%lu]\t"
808                         "pdp_temp %10.2f\t"
809                         "new_prep %10.2f\t"
810                         "new_unkn_sec %5lu\n",
811                         i, pdp_temp[i],
812                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
813                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
814 #endif
815             }
816
817                 /* if there were errors during the last loop, bail out here */
818             if (rrd_test_error()){
819                free(step_start);
820                break;
821             }
822
823                 /* compute the number of elapsed pdp_st moments */
824                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
825 #ifdef DEBUG
826                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
827 #endif
828                 if (rra_step_cnt == NULL)
829                 {
830                    rra_step_cnt = (unsigned long *) 
831                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
832                 }
833
834             for(i = 0, rra_start = rra_begin;
835                 i < rrd.stat_head->rra_cnt;
836             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
837                 i++)
838                 {
839                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
840                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
841                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
842         if (start_pdp_offset <= elapsed_pdp_st) {
843            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
844                       rrd.rra_def[i].pdp_cnt + 1;
845             } else {
846                    rra_step_cnt[i] = 0;
847                 }
848
849                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
850                 {
851                    /* If this is a bulk update, we need to skip ahead in the seasonal
852                         * arrays so that they will be correct for the next observed value;
853                         * note that for the bulk update itself, no update will occur to
854                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
855                         * be set to DNAN. */
856            if (rra_step_cnt[i] > 2) 
857                    {
858                           /* skip update by resetting rra_step_cnt[i],
859                            * note that this is not data source specific; this is due
860                            * to the bulk update, not a DNAN value for the specific data
861                            * source. */
862                           rra_step_cnt[i] = 0;
863               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
864                              &last_seasonal_coef);
865                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
866                              &seasonal_coef);
867                    }
868                 
869                   /* periodically run a smoother for seasonal effects */
870                   /* Need to use first cdp parameter buffer to track
871                    * burnin (burnin requires a specific smoothing schedule).
872                    * The CDP_init_seasonal parameter is really an RRA level,
873                    * not a data source within RRA level parameter, but the rra_def
874                    * is read only for rrd_update (not flushed to disk). */
875                   iii = i*(rrd.stat_head -> ds_cnt);
876                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
877                           <= BURNIN_CYCLES)
878                   {
879                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
880                                  > rrd.rra_def[i].row_cnt - 1) {
881                            /* mark off one of the burnin cycles */
882                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
883                        schedule_smooth = 1;
884                          }  
885                   } else {
886                          /* someone has no doubt invented a trick to deal with this
887                           * wrap around, but at least this code is clear. */
888                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
889                              rrd.rra_ptr[i].cur_row)
890                          {
891                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
892                                   * mapping between PDP and CDP */
893                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
894                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
895                                  {
896 #ifdef DEBUG
897                                         fprintf(stderr,
898                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
899                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
900                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
901 #endif
902                                         schedule_smooth = 1;
903                                  }
904              } else {
905                                  /* can't rely on negative numbers because we are working with
906                                   * unsigned values */
907                                  /* Don't need modulus here. If we've wrapped more than once, only
908                                   * one smooth is executed at the end. */
909                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
910                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
911                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
912                                  {
913 #ifdef DEBUG
914                                         fprintf(stderr,
915                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
916                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
917                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
918 #endif
919                                         schedule_smooth = 1;
920                                  }
921                          }
922                   }
923
924               rra_current = ftell(rrd_file); 
925                 } /* if cf is DEVSEASONAL or SEASONAL */
926
927         if (rrd_test_error()) break;
928
929                     /* update CDP_PREP areas */
930                     /* loop over data soures within each RRA */
931                     for(ii = 0;
932                         ii < rrd.stat_head->ds_cnt;
933                         ii++)
934                         {
935                         
936                         /* iii indexes the CDP prep area for this data source within the RRA */
937                         iii=i*rrd.stat_head->ds_cnt+ii;
938
939                         if (rrd.rra_def[i].pdp_cnt > 1) {
940                           
941                            if (rra_step_cnt[i] > 0) {
942                            /* If we are in this block, as least 1 CDP value will be written to
943                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
944                                 * to be written, then the "fill in" value is the CDP_secondary_val
945                                 * entry. */
946                                   if (isnan(pdp_temp[ii]))
947                   {
948                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
949                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
950                                   } else {
951                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
952                                           * CDP data entries. No matter the CF, the value is the same because
953                                           * the average, max, min, and last of a list of identical values is
954                                           * the same, namely, the value itself. */
955                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
956                                   }
957                      
958                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
959                                       > rrd.rra_def[i].pdp_cnt*
960                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
961                                   {
962                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
963                                          /* initialize carry over */
964                                          if (current_cf == CF_AVERAGE) {
965                                                    if (isnan(pdp_temp[ii])) { 
966                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
967                                                    } else {
968                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
969                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
970                                                    }
971                                          } else {
972                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
973                                          }
974                                   } else {
975                                          rrd_value_t cum_val, cur_val; 
976                                      switch (current_cf) {
977                                                 case CF_AVERAGE:
978                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
979                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
980                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
981                                                (cum_val + cur_val * start_pdp_offset) /
982                                            (rrd.rra_def[i].pdp_cnt
983                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
984                                                    /* initialize carry over value */
985                                                    if (isnan(pdp_temp[ii])) { 
986                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
987                                                    } else {
988                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
989                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
990                                                    }
991                                                    break;
992                                                 case CF_MAXIMUM:
993                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
994                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
995 #ifdef DEBUG
996                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
997                                                           isnan(pdp_temp[ii])) {
998                                                      fprintf(stderr,
999                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1000                                                                 i,ii);
1001                                                          exit(-1);
1002                                                   }
1003 #endif
1004                                                   if (cur_val > cum_val)
1005                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1006                                                   else
1007                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1008                                                   /* initialize carry over value */
1009                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1010                                                   break;
1011                                                 case CF_MINIMUM:
1012                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1013                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1014 #ifdef DEBUG
1015                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1016                                                           isnan(pdp_temp[ii])) {
1017                                                      fprintf(stderr,
1018                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1019                                                                 i,ii);
1020                                                          exit(-1);
1021                                                   }
1022 #endif
1023                                                   if (cur_val < cum_val)
1024                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1025                                                   else
1026                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1027                                                   /* initialize carry over value */
1028                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1029                                                   break;
1030                                                 case CF_LAST:
1031                                                 default:
1032                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1033                                                    /* initialize carry over value */
1034                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1035                                                 break;
1036                                          }
1037                                   } /* endif meets xff value requirement for a valid value */
1038                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1039                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1040                                   if (isnan(pdp_temp[ii]))
1041                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1042                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1043                                   else
1044                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1045                } else  /* rra_step_cnt[i]  == 0 */
1046                            {
1047 #ifdef DEBUG
1048                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1049                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1050                                          i,ii);
1051                                   } else {
1052                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1053                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1054                                   }
1055 #endif
1056                                   if (isnan(pdp_temp[ii])) {
1057                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1058                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1059                                   {
1060                                          if (current_cf == CF_AVERAGE) {
1061                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1062                                                    elapsed_pdp_st;
1063                                          } else {
1064                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1065                                          }
1066 #ifdef DEBUG
1067                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1068                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1069 #endif
1070                                   } else {
1071                                          switch (current_cf) {
1072                                          case CF_AVERAGE:
1073                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1074                                                    elapsed_pdp_st;
1075                                                 break;
1076                                          case CF_MINIMUM:
1077                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1078                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1079                                                 break; 
1080                                          case CF_MAXIMUM:
1081                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1082                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1083                                                 break; 
1084                                          case CF_LAST:
1085                                          default:
1086                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1087                                                 break;
1088                                          }
1089                                   }
1090                            }
1091                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1092                            if (elapsed_pdp_st > 2)
1093                            {
1094                                    switch (current_cf) {
1095                                    case CF_AVERAGE:
1096                                    default:
1097                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1098                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1099                                           break;
1100                    case CF_SEASONAL:
1101                                    case CF_DEVSEASONAL:
1102                                           /* need to update cached seasonal values, so they are consistent
1103                                            * with the bulk update */
1104                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1105                                            * CDP_last_deviation are the same. */
1106                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1107                                                  last_seasonal_coef[ii];
1108                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1109                                                  seasonal_coef[ii];
1110                                           break;
1111                    case CF_HWPREDICT:
1112                                           /* need to update the null_count and last_null_count.
1113                                            * even do this for non-DNAN pdp_temp because the
1114                                            * algorithm is not learning from batch updates. */
1115                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1116                                                  elapsed_pdp_st;
1117                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1118                                                  elapsed_pdp_st - 1;
1119                                           /* fall through */
1120                                    case CF_DEVPREDICT:
1121                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1122                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1123                                           break;
1124                    case CF_FAILURES:
1125                                           /* do not count missed bulk values as failures */
1126                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1127                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1128                                           /* need to reset violations buffer.
1129                                            * could do this more carefully, but for now, just
1130                                            * assume a bulk update wipes away all violations. */
1131                       erase_violations(&rrd, iii, i);
1132                                           break;
1133                                    }
1134                            } 
1135                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1136
1137                         if (rrd_test_error()) break;
1138
1139                         } /* endif data sources loop */
1140         } /* end RRA Loop */
1141
1142                 /* this loop is only entered if elapsed_pdp_st < 3 */
1143                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1144                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1145                 {
1146                for(i = 0, rra_start = rra_begin;
1147                    i < rrd.stat_head->rra_cnt;
1148                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1149                    i++)
1150                    {
1151                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1152
1153                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1154                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1155                           {
1156                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1157                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1158                                 &seasonal_coef);
1159                  rra_current = ftell(rrd_file);
1160                           }
1161                           if (rrd_test_error()) break;
1162                       /* loop over data soures within each RRA */
1163                       for(ii = 0;
1164                           ii < rrd.stat_head->ds_cnt;
1165                           ii++)
1166                           {
1167                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1168                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1169                                     scratch_idx, seasonal_coef);
1170                           }
1171            } /* end RRA Loop */
1172                    if (rrd_test_error()) break;
1173             } /* end elapsed_pdp_st loop */
1174
1175                 if (rrd_test_error()) break;
1176
1177                 /* Ready to write to disk */
1178                 /* Move sequentially through the file, writing one RRA at a time.
1179                  * Note this architecture divorces the computation of CDP with
1180                  * flushing updated RRA entries to disk. */
1181             for(i = 0, rra_start = rra_begin;
1182                 i < rrd.stat_head->rra_cnt;
1183             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1184                 i++) {
1185                 /* is there anything to write for this RRA? If not, continue. */
1186         if (rra_step_cnt[i] == 0) continue;
1187
1188                 /* write the first row */
1189 #ifdef DEBUG
1190         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1191 #endif
1192             rrd.rra_ptr[i].cur_row++;
1193             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1194                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1195                 /* positition on the first row */
1196                 rra_pos_tmp = rra_start +
1197                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1198                 if(rra_pos_tmp != rra_current) {
1199                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1200                       rrd_set_error("seek error in rrd");
1201                       break;
1202                    }
1203                    rra_current = rra_pos_tmp;
1204                 }
1205
1206 #ifdef DEBUG
1207             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1208 #endif
1209                 scratch_idx = CDP_primary_val;
1210                 if (pcdp_summary != NULL)
1211                 {
1212                    rra_time = (current_time - current_time 
1213                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1214                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1215                 }
1216                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1217                    pcdp_summary, &rra_time);
1218                 if (rrd_test_error()) break;
1219
1220                 /* write other rows of the bulk update, if any */
1221                 scratch_idx = CDP_secondary_val;
1222                 for ( ; rra_step_cnt[i] > 1; 
1223                      rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1224                 {
1225                    if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1226                    {
1227 #ifdef DEBUG
1228               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1229                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1230 #endif
1231                           /* wrap */
1232                           rrd.rra_ptr[i].cur_row = 0;
1233                           /* seek back to beginning of current rra */
1234                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1235                           {
1236                          rrd_set_error("seek error in rrd");
1237                          break;
1238                           }
1239 #ifdef DEBUG
1240                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1241 #endif
1242                           rra_current = rra_start;
1243                    }
1244                    if (pcdp_summary != NULL)
1245                    {
1246                       rra_time = (current_time - current_time 
1247                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1248                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1249                    }
1250                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1251                       pcdp_summary, &rra_time);
1252                 }
1253                 
1254                 if (rrd_test_error())
1255                   break;
1256                 } /* RRA LOOP */
1257
1258             /* break out of the argument parsing loop if error_string is set */
1259             if (rrd_test_error()){
1260                    free(step_start);
1261                    break;
1262             } 
1263             
1264         } /* endif a pdp_st has occurred */ 
1265         rrd.live_head->last_up = current_time;
1266         rrd.live_head->last_up_usec = current_time_usec; 
1267         free(step_start);
1268     } /* function argument loop */
1269
1270     if (seasonal_coef != NULL) free(seasonal_coef);
1271     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1272         if (rra_step_cnt != NULL) free(rra_step_cnt);
1273     rpnstack_free(&rpnstack);
1274
1275     /* if we got here and if there is an error and if the file has not been
1276      * written to, then close things up and return. */
1277     if (rrd_test_error()) {
1278         free(updvals);
1279         free(tmpl_idx);
1280         rrd_free(&rrd);
1281         free(pdp_temp);
1282         free(pdp_new);
1283         fclose(rrd_file);
1284         return(-1);
1285     }
1286
1287     /* aargh ... that was tough ... so many loops ... anyway, its done.
1288      * we just need to write back the live header portion now*/
1289
1290     if (fseek(rrd_file, (sizeof(stat_head_t)
1291                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1292                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1293               SEEK_SET) != 0) {
1294         rrd_set_error("seek rrd for live header writeback");
1295         free(updvals);
1296         free(tmpl_idx);
1297         rrd_free(&rrd);
1298         free(pdp_temp);
1299         free(pdp_new);
1300         fclose(rrd_file);
1301         return(-1);
1302     }
1303
1304     if(version >= 3) {
1305             if(fwrite( rrd.live_head,
1306                        sizeof(live_head_t), 1, rrd_file) != 1){
1307                 rrd_set_error("fwrite live_head to rrd");
1308                 free(updvals);
1309                 rrd_free(&rrd);
1310                 free(tmpl_idx);
1311                 free(pdp_temp);
1312                 free(pdp_new);
1313                 fclose(rrd_file);
1314                 return(-1);
1315             }
1316     }
1317     else {
1318             if(fwrite( &rrd.live_head->last_up,
1319                        sizeof(time_t), 1, rrd_file) != 1){
1320                 rrd_set_error("fwrite live_head to rrd");
1321                 free(updvals);
1322                 rrd_free(&rrd);
1323                 free(tmpl_idx);
1324                 free(pdp_temp);
1325                 free(pdp_new);
1326                 fclose(rrd_file);
1327                 return(-1);
1328             }
1329     }
1330             
1331
1332     if(fwrite( rrd.pdp_prep,
1333                sizeof(pdp_prep_t),
1334                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1335         rrd_set_error("ftwrite pdp_prep to rrd");
1336         free(updvals);
1337         rrd_free(&rrd);
1338         free(tmpl_idx);
1339         free(pdp_temp);
1340         free(pdp_new);
1341         fclose(rrd_file);
1342         return(-1);
1343     }
1344
1345     if(fwrite( rrd.cdp_prep,
1346                sizeof(cdp_prep_t),
1347                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1348        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1349
1350         rrd_set_error("ftwrite cdp_prep to rrd");
1351         free(updvals);
1352         free(tmpl_idx);
1353         rrd_free(&rrd);
1354         free(pdp_temp);
1355         free(pdp_new);
1356         fclose(rrd_file);
1357         return(-1);
1358     }
1359
1360     if(fwrite( rrd.rra_ptr,
1361                sizeof(rra_ptr_t), 
1362                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1363         rrd_set_error("fwrite rra_ptr to rrd");
1364         free(updvals);
1365         free(tmpl_idx);
1366         rrd_free(&rrd);
1367         free(pdp_temp);
1368         free(pdp_new);
1369         fclose(rrd_file);
1370         return(-1);
1371     }
1372
1373     /* OK now close the files and free the memory */
1374     if(fclose(rrd_file) != 0){
1375         rrd_set_error("closing rrd");
1376         free(updvals);
1377         free(tmpl_idx);
1378         rrd_free(&rrd);
1379         free(pdp_temp);
1380         free(pdp_new);
1381         return(-1);
1382     }
1383
1384     /* calling the smoothing code here guarantees at most
1385          * one smoothing operation per rrd_update call. Unfortunately,
1386          * it is possible with bulk updates, or a long-delayed update
1387          * for smoothing to occur off-schedule. This really isn't
1388          * critical except during the burning cycles. */
1389         if (schedule_smooth)
1390         {
1391 #ifndef WIN32
1392           rrd_file = fopen(filename,"r+");
1393 #else
1394           rrd_file = fopen(filename,"rb+");
1395 #endif
1396           rra_start = rra_begin;
1397           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1398           {
1399             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1400                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1401             {
1402 #ifdef DEBUG
1403               fprintf(stderr,"Running smoother for rra %ld\n",i);
1404 #endif
1405               apply_smoother(&rrd,i,rra_start,rrd_file);
1406               if (rrd_test_error())
1407                 break;
1408             }
1409             rra_start += rrd.rra_def[i].row_cnt
1410               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1411           }
1412           fclose(rrd_file);
1413         }
1414     rrd_free(&rrd);
1415     free(updvals);
1416     free(tmpl_idx);
1417     free(pdp_new);
1418     free(pdp_temp);
1419     return(0);
1420 }
1421
1422 /*
1423  * get exclusive lock to whole file.
1424  * lock gets removed when we close the file
1425  *
1426  * returns 0 on success
1427  */
1428 int
1429 LockRRD(FILE *rrdfile)
1430 {
1431     int rrd_fd;         /* File descriptor for RRD */
1432     int                 stat;
1433
1434     rrd_fd = fileno(rrdfile);
1435
1436         {
1437 #ifndef WIN32    
1438                 struct flock    lock;
1439     lock.l_type = F_WRLCK;    /* exclusive write lock */
1440     lock.l_len = 0;           /* whole file */
1441     lock.l_start = 0;         /* start of file */
1442     lock.l_whence = SEEK_SET;   /* end of file */
1443
1444     stat = fcntl(rrd_fd, F_SETLK, &lock);
1445 #else
1446                 struct _stat st;
1447
1448                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1449                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1450                 } else {
1451                         stat = -1;
1452                 }
1453 #endif
1454         }
1455
1456     return(stat);
1457 }
1458
1459
1460 info_t
1461 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1462                unsigned short CDP_scratch_idx, FILE *rrd_file,
1463                    info_t *pcdp_summary, time_t *rra_time)
1464 {
1465    unsigned long ds_idx, cdp_idx;
1466    infoval iv;
1467   
1468    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1469    {
1470       /* compute the cdp index */
1471       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1472 #ifdef DEBUG
1473           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1474              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1475              rrd -> rra_def[rra_idx].cf_nam);
1476 #endif 
1477       if (pcdp_summary != NULL)
1478           {
1479              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1480              /* append info to the return hash */
1481                  pcdp_summary = info_push(pcdp_summary,
1482                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1483                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1484                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1485          RD_I_VAL, iv);
1486           }
1487           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1488                  sizeof(rrd_value_t),1,rrd_file) != 1)
1489           { 
1490              rrd_set_error("writing rrd");
1491              return 0;
1492           }
1493           *rra_current += sizeof(rrd_value_t);
1494         }
1495         return (pcdp_summary);
1496 }