be pickier about what we accept in rrd_update. Complain if things do not work out
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.1.x  Copyright Tobias Oetiker, 1997 - 2002
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.11  2003/09/02 21:58:35  oetiker
9  * be pickier about what we accept in rrd_update. Complain if things do not work out
10  *
11  * Revision 1.10  2003/04/29 19:14:12  jake
12  * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
13  * Also revert accidental addition of -I to aclocal MakeMakefile.
14  *
15  * Revision 1.9  2003/04/25 18:35:08  jake
16  * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
17  *
18  * Revision 1.8  2003/03/31 21:22:12  oetiker
19  * enables RRDtool updates with microsecond or in case of windows millisecond
20  * precision. This is needed to reduce time measurement error when archive step
21  * is small. (<30s) --  Sasha Mikheev <sasha@avalon-net.co.il>
22  *
23  * Revision 1.7  2003/02/13 07:05:27  oetiker
24  * Find attached the patch I promised to send to you. Please note that there
25  * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
26  * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
27  * library is identical to librrd, but it contains support code for per-thread
28  * global variables currently used for error information only. This is similar
29  * to how errno per-thread variables are implemented.  librrd_th must be linked
30  * alongside of libpthred
31  *
32  * There is also a new file "THREADS", holding some documentation.
33  *
34  * -- Peter Stamfest <peter@stamfest.at>
35  *
36  * Revision 1.6  2002/02/01 20:34:49  oetiker
37  * fixed version number and date/time
38  *
39  * Revision 1.5  2001/05/09 05:31:01  oetiker
40  * Bug fix: when update of multiple PDP/CDP RRAs coincided
41  * with interpolation of multiple PDPs an incorrect value was
42  * stored as the CDP. Especially evident for GAUGE data sources.
43  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
44  *
45  * Revision 1.4  2001/03/10 23:54:41  oetiker
46  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
47  * parser and calculator from rrd_graph and puts then in a new file,
48  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
49  * clean-up of aberrant behavior stuff, including a bug fix.
50  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
51  * -- Jake Brutlag <jakeb@corp.webtv.net>
52  *
53  * Revision 1.3  2001/03/04 13:01:55  oetiker
54  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
55  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
56  * This is backwards compatible! But new files using the Aberrant stuff are not readable
57  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
58  * -- Jake Brutlag <jakeb@corp.webtv.net>
59  *
60  * Revision 1.2  2001/03/04 11:14:25  oetiker
61  * added at-style-time@value:value syntax to rrd_update
62  * --  Dave Bodenstab <imdave@mcs.net>
63  *
64  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
65  * checkin
66  *
67  *****************************************************************************/
68
69 #include "rrd_tool.h"
70 #include <sys/types.h>
71 #include <fcntl.h>
72
73 #ifdef WIN32
74  #include <sys/locking.h>
75  #include <sys/stat.h>
76  #include <io.h>
77 #endif
78
79 #include "rrd_hw.h"
80 #include "rrd_rpncalc.h"
81
82 #include "rrd_is_thread_safe.h"
83
84 #ifdef WIN32
85 /*
86  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
87  * replacement.
88  */
89 #include <sys/timeb.h>
90
91 struct timeval {
92         time_t tv_sec; /* seconds */
93         long tv_usec;  /* microseconds */
94 };
95
96 struct __timezone {
97         int  tz_minuteswest; /* minutes W of Greenwich */
98         int  tz_dsttime;     /* type of dst correction */
99 };
100
101 static gettimeofday(struct timeval *t, struct __timezone *tz) {
102         
103         struct timeb current_time;
104
105         _ftime(&current_time);
106         
107         t->tv_sec  = current_time.time;
108         t->tv_usec = current_time.millitm * 1000;
109 }
110
111 #endif
112 /*
113  * normilize time as returned by gettimeofday. usec part must
114  * be always >= 0
115  */
116 static void normalize_time(struct timeval *t)
117 {
118         if(t->tv_usec < 0) {
119                 t->tv_sec--;
120                 t->tv_usec += 1000000L;
121         }
122 }
123
124 /* Local prototypes */
125 int LockRRD(FILE *rrd_file);
126 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
127                                         unsigned long *rra_current,
128                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
129                                         info_t *pcdp_summary, time_t *rra_time);
130 int rrd_update_r(char *filename, char *template, int argc, char **argv);
131 int _rrd_update(char *filename, char *template, int argc, char **argv, 
132                                         info_t*);
133
134 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
135
136
137 #ifdef STANDALONE
138 int 
139 main(int argc, char **argv){
140         rrd_update(argc,argv);
141         if (rrd_test_error()) {
142                 printf("RRDtool 1.1.x  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
143                         "Usage: rrdupdate filename\n"
144                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
145                         "\t\t\ttime|N:value[:value...]\n\n"
146                         "\t\t\tat-time@value[:value...]\n\n"
147                         "\t\t\t[ time:value[:value...] ..]\n\n");
148                                    
149                 printf("ERROR: %s\n",rrd_get_error());
150                 rrd_clear_error();                                                            
151                 return 1;
152         }
153         return 0;
154 }
155 #endif
156
157 info_t *rrd_update_v(int argc, char **argv)
158 {
159     char             *template = NULL;          
160         info_t *result = NULL;
161         infoval rc;
162
163     while (1) {
164                 static struct option long_options[] =
165                         {
166                                 {"template",      required_argument, 0, 't'},
167                                 {0,0,0,0}
168                         };
169                 int option_index = 0;
170                 int opt;
171                 opt = getopt_long(argc, argv, "t:", 
172                                                   long_options, &option_index);
173                 
174                 if (opt == EOF)
175                         break;
176                 
177                 switch(opt) {
178                 case 't':
179                         template = optarg;
180                         break;
181                 
182                 case '?':
183                         rrd_set_error("unknown option '%s'",argv[optind-1]);
184             rc.u_int = -1;
185                         goto end_tag;
186                 }
187     }
188
189     /* need at least 2 arguments: filename, data. */
190     if (argc-optind < 2) {
191                 rrd_set_error("Not enough arguments");
192         rc.u_int = -1;
193                 goto end_tag;
194     }
195     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
196         rc.u_int = _rrd_update(argv[optind], template,
197                       argc - optind - 1, argv + optind + 1, result);
198     result->value.u_int = rc.u_int;
199 end_tag:
200     return result;
201 }
202
203 int
204 rrd_update(int argc, char **argv)
205 {
206     char             *template = NULL;          
207     int rc;
208
209     while (1) {
210                 static struct option long_options[] =
211                         {
212                                 {"template",      required_argument, 0, 't'},
213                                 {0,0,0,0}
214                         };
215                 int option_index = 0;
216                 int opt;
217                 opt = getopt_long(argc, argv, "t:", 
218                                                   long_options, &option_index);
219                 
220                 if (opt == EOF)
221                         break;
222                 
223                 switch(opt) {
224                 case 't':
225                         template = optarg;
226                         break;
227                 
228                 case '?':
229                         rrd_set_error("unknown option '%s'",argv[optind-1]);
230                         return(-1);
231                 }
232     }
233
234     /* need at least 2 arguments: filename, data. */
235     if (argc-optind < 2) {
236                 rrd_set_error("Not enough arguments");
237
238                 return -1;
239     }
240  
241         rc = rrd_update_r(argv[optind], template,
242                       argc - optind - 1, argv + optind + 1);
243     return rc;
244 }
245
246 int
247 rrd_update_r(char *filename, char *template, int argc, char **argv)
248 {
249    return _rrd_update(filename, template, argc, argv, NULL);
250 }
251
252 int
253 _rrd_update(char *filename, char *template, int argc, char **argv, 
254    info_t *pcdp_summary)
255 {
256
257     int              arg_i = 2;
258     short            j;
259     unsigned long    i,ii,iii=1;
260
261     unsigned long    rra_begin;          /* byte pointer to the rra
262                                           * area in the rrd file.  this
263                                           * pointer never changes value */
264     unsigned long    rra_start;          /* byte pointer to the rra
265                                           * area in the rrd file.  this
266                                           * pointer changes as each rrd is
267                                           * processed. */
268     unsigned long    rra_current;        /* byte pointer to the current write
269                                           * spot in the rrd file. */
270     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
271     double           interval,
272         pre_int,post_int;                /* interval between this and
273                                           * the last run */
274     unsigned long    proc_pdp_st;        /* which pdp_st was the last
275                                           * to be processed */
276     unsigned long    occu_pdp_st;        /* when was the pdp_st
277                                           * before the last update
278                                           * time */
279     unsigned long    proc_pdp_age;       /* how old was the data in
280                                           * the pdp prep area when it
281                                           * was last updated */
282     unsigned long    occu_pdp_age;       /* how long ago was the last
283                                           * pdp_step time */
284     rrd_value_t      *pdp_new;           /* prepare the incoming data
285                                           * to be added the the
286                                           * existing entry */
287     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
288                                           * to be added the the
289                                           * cdp values */
290
291     long             *tmpl_idx;          /* index representing the settings
292                                             transported by the template index */
293     unsigned long    tmpl_cnt = 2;       /* time and data */
294
295     FILE             *rrd_file;
296     rrd_t            rrd;
297     time_t           current_time;
298         time_t           rra_time; /* time of update for a RRA */
299     unsigned long    current_time_usec;  /* microseconds part of current time */
300     struct timeval   tmp_time;           /* used for time conversion */
301
302     char             **updvals;
303     int              schedule_smooth = 0;
304         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
305                                          /* a vector of future Holt-Winters seasonal coefs */
306     unsigned long    elapsed_pdp_st;
307                                          /* number of elapsed PDP steps since last update */
308     unsigned long    *rra_step_cnt = NULL;
309                                          /* number of rows to be updated in an RRA for a data
310                                           * value. */
311     unsigned long    start_pdp_offset;
312                                          /* number of PDP steps since the last update that
313                                           * are assigned to the first CDP to be generated
314                                           * since the last update. */
315     unsigned short   scratch_idx;
316                                          /* index into the CDP scratch array */
317     enum cf_en       current_cf;
318                                          /* numeric id of the current consolidation function */
319     rpnstack_t       rpnstack; /* used for COMPUTE DS */
320     int              version;  /* rrd version */
321     char             *endptr; /* used in the conversion */
322
323     rpnstack_init(&rpnstack);
324
325     /* need at least 1 arguments: data. */
326     if (argc < 1) {
327         rrd_set_error("Not enough arguments");
328         return -1;
329     }
330     
331     
332
333     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
334         return -1;
335     }
336     /* initialize time */
337     version = atoi(rrd.stat_head->version);
338     gettimeofday(&tmp_time, 0);
339     normalize_time(&tmp_time);
340     current_time = tmp_time.tv_sec;
341     if(version >= 3) {
342         current_time_usec = tmp_time.tv_usec;
343     }
344     else {
345         current_time_usec = 0;
346     }
347
348     rra_current = rra_start = rra_begin = ftell(rrd_file);
349     /* This is defined in the ANSI C standard, section 7.9.5.3:
350
351         When a file is opened with udpate mode ('+' as the second
352         or third character in the ... list of mode argument
353         variables), both input and ouptut may be performed on the
354         associated stream.  However, ...  input may not be directly
355         followed by output without an intervening call to a file
356         positioning function, unless the input oepration encounters
357         end-of-file. */
358     fseek(rrd_file, 0, SEEK_CUR);
359
360     
361     /* get exclusive lock to whole file.
362      * lock gets removed when we close the file.
363      */
364     if (LockRRD(rrd_file) != 0) {
365       rrd_set_error("could not lock RRD");
366       rrd_free(&rrd);
367       fclose(rrd_file);
368       return(-1);   
369     } 
370
371     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
372         rrd_set_error("allocating updvals pointer array");
373         rrd_free(&rrd);
374         fclose(rrd_file);
375         return(-1);
376     }
377
378     if ((pdp_temp = malloc(sizeof(rrd_value_t)
379                            *rrd.stat_head->ds_cnt))==NULL){
380         rrd_set_error("allocating pdp_temp ...");
381         free(updvals);
382         rrd_free(&rrd);
383         fclose(rrd_file);
384         return(-1);
385     }
386
387     if ((tmpl_idx = malloc(sizeof(unsigned long)
388                            *(rrd.stat_head->ds_cnt+1)))==NULL){
389         rrd_set_error("allocating tmpl_idx ...");
390         free(pdp_temp);
391         free(updvals);
392         rrd_free(&rrd);
393         fclose(rrd_file);
394         return(-1);
395     }
396     /* initialize template redirector */
397     /* default config example (assume DS 1 is a CDEF DS)
398        tmpl_idx[0] -> 0; (time)
399        tmpl_idx[1] -> 1; (DS 0)
400        tmpl_idx[2] -> 3; (DS 2)
401        tmpl_idx[3] -> 4; (DS 3) */
402     tmpl_idx[0] = 0; /* time */
403     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
404         {
405            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
406               tmpl_idx[ii++]=i;
407         }
408     tmpl_cnt= ii;
409
410     if (template) {
411         char *dsname;
412         unsigned int tmpl_len;
413         dsname = template;
414         tmpl_cnt = 1; /* the first entry is the time */
415         tmpl_len = strlen(template);
416         for(i=0;i<=tmpl_len ;i++) {
417             if (template[i] == ':' || template[i] == '\0') {
418                 template[i] = '\0';
419                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
420                     rrd_set_error("Template contains more DS definitions than RRD");
421                     free(updvals); free(pdp_temp);
422                     free(tmpl_idx); rrd_free(&rrd);
423                     fclose(rrd_file); return(-1);
424                 }
425                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
426                     rrd_set_error("unknown DS name '%s'",dsname);
427                     free(updvals); free(pdp_temp);
428                     free(tmpl_idx); rrd_free(&rrd);
429                     fclose(rrd_file); return(-1);
430                 } else {
431                   /* the first element is always the time */
432                   tmpl_idx[tmpl_cnt-1]++; 
433                   /* go to the next entry on the template */
434                   dsname = &template[i+1];
435                   /* fix the damage we did before */
436                   if (i<tmpl_len) {
437                      template[i]=':';
438                   } 
439
440                 }
441             }       
442         }
443     }
444     if ((pdp_new = malloc(sizeof(rrd_value_t)
445                           *rrd.stat_head->ds_cnt))==NULL){
446         rrd_set_error("allocating pdp_new ...");
447         free(updvals);
448         free(pdp_temp);
449         free(tmpl_idx);
450         rrd_free(&rrd);
451         fclose(rrd_file);
452         return(-1);
453     }
454
455     /* loop through the arguments. */
456     for(arg_i=0; arg_i<argc;arg_i++) {
457         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
458         char *step_start = stepper;
459         char *p;
460         char *parsetime_error = NULL;
461         enum {atstyle, normal} timesyntax;
462         struct time_value ds_tv;
463         if (stepper == NULL){
464                 rrd_set_error("failed duplication argv entry");
465                 free(updvals);
466                 free(pdp_temp);  
467                 free(tmpl_idx);
468                 rrd_free(&rrd);
469                 fclose(rrd_file);
470                 return(-1);
471          }
472         /* initialize all ds input to unknown except the first one
473            which has always got to be set */
474         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
475         strcpy(stepper,argv[arg_i]);
476         updvals[0]=stepper;
477         /* separate all ds elements; first must be examined separately
478            due to alternate time syntax */
479         if ((p=strchr(stepper,'@'))!=NULL) {
480             timesyntax = atstyle;
481             *p = '\0';
482             stepper = p+1;
483         } else if ((p=strchr(stepper,':'))!=NULL) {
484             timesyntax = normal;
485             *p = '\0';
486             stepper = p+1;
487         } else {
488             rrd_set_error("expected timestamp not found in data source from %s:...",
489                           argv[arg_i]);
490             free(step_start);
491             break;
492         }
493         ii=1;
494         updvals[tmpl_idx[ii]] = stepper;
495         while (*stepper) {
496             if (*stepper == ':') {
497                 *stepper = '\0';
498                 ii++;
499                 if (ii<tmpl_cnt){                   
500                     updvals[tmpl_idx[ii]] = stepper+1;
501                 }
502             }
503             stepper++;
504         }
505
506         if (ii != tmpl_cnt-1) {
507             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
508                           tmpl_cnt-1, ii, argv[arg_i]);
509             free(step_start);
510             break;
511         }
512         
513         /* get the time from the reading ... handle N */
514         if (timesyntax == atstyle) {
515             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
516                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
517                 free(step_start);
518                 break;
519             }
520             if (ds_tv.type == RELATIVE_TO_END_TIME ||
521                 ds_tv.type == RELATIVE_TO_START_TIME) {
522                 rrd_set_error("specifying time relative to the 'start' "
523                               "or 'end' makes no sense here: %s",
524                               updvals[0]);
525                 free(step_start);
526                 break;
527             }
528
529             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
530             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
531             
532         } else if (strcmp(updvals[0],"N")==0){
533             gettimeofday(&tmp_time, 0);
534             normalize_time(&tmp_time);
535             current_time = tmp_time.tv_sec;
536             current_time_usec = tmp_time.tv_usec;
537         } else {
538             double tmp;
539             tmp = strtod(updvals[0], 0);
540             current_time = floor(tmp);
541             current_time_usec = (long)((tmp - current_time) * 1000000L);
542         }
543         /* dont do any correction for old version RRDs */
544         if(version < 3) 
545             current_time_usec = 0;
546         
547         if(current_time <= rrd.live_head->last_up){
548             rrd_set_error("illegal attempt to update using time %ld when "
549                           "last update time is %ld (minimum one second step)",
550                           current_time, rrd.live_head->last_up);
551             free(step_start);
552             break;
553         }
554         
555         
556         /* seek to the beginning of the rra's */
557         if (rra_current != rra_begin) {
558             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
559                 rrd_set_error("seek error in rrd");
560                 free(step_start);
561                 break;
562             }
563             rra_current = rra_begin;
564         }
565         rra_start = rra_begin;
566
567         /* when was the current pdp started */
568         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
569         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
570
571         /* when did the last pdp_st occur */
572         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
573         occu_pdp_st = current_time - occu_pdp_age;
574         /* interval = current_time - rrd.live_head->last_up; */
575         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
576     
577         if (occu_pdp_st > proc_pdp_st){
578             /* OK we passed the pdp_st moment*/
579             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
580                                                               * occurred before the latest
581                                                               * pdp_st moment*/
582             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
583             post_int = occu_pdp_age;                         /* how much after it */
584             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
585         } else {
586             pre_int = interval;
587             post_int = 0;
588         }
589
590 #ifdef DEBUG
591         printf(
592                "proc_pdp_age %lu\t"
593                "proc_pdp_st %lu\t" 
594                "occu_pfp_age %lu\t" 
595                "occu_pdp_st %lu\t"
596                "int %lf\t"
597                "pre_int %lf\t"
598                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
599                 occu_pdp_age, occu_pdp_st,
600                interval, pre_int, post_int);
601 #endif
602     
603         /* process the data sources and update the pdp_prep 
604          * area accordingly */
605         for(i=0;i<rrd.stat_head->ds_cnt;i++){
606             enum dst_en dst_idx;
607             dst_idx= dst_conv(rrd.ds_def[i].dst);
608                 /* NOTE: DST_CDEF should never enter this if block, because
609                  * updvals[i+1][0] is initialized to 'U'; unless the caller
610                  * accidently specified a value for the DST_CDEF. To handle 
611                  * this case, an extra check is required. */
612             if((updvals[i+1][0] != 'U') &&
613                    (dst_idx != DST_CDEF) &&
614                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
615                double rate = DNAN;
616                /* the data source type defines how to process the data */
617                 /* pdp_new contains rate * time ... eg the bytes
618                  * transferred during the interval. Doing it this way saves
619                  * a lot of math operations */
620                 
621
622                 switch(dst_idx){
623                 case DST_COUNTER:
624                 case DST_DERIVE:
625                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
626                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
627                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii=0 && updvals[i+1][ii] == '-')){
628                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
629                                  break;
630                             }
631                        }
632                        if (rrd_test_error()){
633                             break;
634                        }
635                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
636                        if(dst_idx == DST_COUNTER) {
637                           /* simple overflow catcher sugestet by andres kroonmaa */
638                           /* this will fail terribly for non 32 or 64 bit counters ... */
639                           /* are there any others in SNMP land ? */
640                           if (pdp_new[i] < (double)0.0 ) 
641                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
642                           if (pdp_new[i] < (double)0.0 ) 
643                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
644                        }
645                        rate = pdp_new[i] / interval;
646                     }
647                    else {
648                      pdp_new[i]= DNAN;          
649                    }
650                    break;
651                 case DST_ABSOLUTE:
652                     errno = 0;
653                     pdp_new[i] = strtod(updvals[i+1],&endptr);
654                     if (errno > 0){
655                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
656                         break;
657                     };
658                     if (endptr[0] != '\0'){
659                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
660                         break;
661                     }
662                     rate = pdp_new[i] / interval;                 
663                     break;
664                 case DST_GAUGE:
665                     errno = 0;
666                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
667                     if (errno > 0){
668                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
669                         break;
670                     };
671                     if (endptr[0] != '\0'){
672                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
673                         break;
674                     }
675                     rate = pdp_new[i] / interval;                  
676                     break;
677                 default:
678                     rrd_set_error("rrd contains unknown DS type : '%s'",
679                                   rrd.ds_def[i].dst);
680                     break;
681                 }
682                 /* break out of this for loop if the error string is set */
683                 if (rrd_test_error()){
684                     break;
685                 }
686                /* make sure pdp_temp is neither too large or too small
687                 * if any of these occur it becomes unknown ...
688                 * sorry folks ... */
689                if ( ! isnan(rate) && 
690                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
691                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
692                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
693                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
694                   pdp_new[i] = DNAN;
695                }               
696             } else {
697                 /* no news is news all the same */
698                 pdp_new[i] = DNAN;
699             }
700             
701             /* make a copy of the command line argument for the next run */
702 #ifdef DEBUG
703             fprintf(stderr,
704                     "prep ds[%lu]\t"
705                     "last_arg '%s'\t"
706                     "this_arg '%s'\t"
707                     "pdp_new %10.2f\n",
708                     i,
709                     rrd.pdp_prep[i].last_ds,
710                     updvals[i+1], pdp_new[i]);
711 #endif
712             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
713                 strncpy(rrd.pdp_prep[i].last_ds,
714                         updvals[i+1],LAST_DS_LEN-1);
715                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
716             }
717         }
718         /* break out of the argument parsing loop if the error_string is set */
719         if (rrd_test_error()){
720             free(step_start);
721             break;
722         }
723         /* has a pdp_st moment occurred since the last run ? */
724
725         if (proc_pdp_st == occu_pdp_st){
726             /* no we have not passed a pdp_st moment. therefore update is simple */
727
728             for(i=0;i<rrd.stat_head->ds_cnt;i++){
729                 if(isnan(pdp_new[i]))
730                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
731                 else
732                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
733 #ifdef DEBUG
734                 fprintf(stderr,
735                         "NO PDP  ds[%lu]\t"
736                         "value %10.2f\t"
737                         "unkn_sec %5lu\n",
738                         i,
739                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
740                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
741 #endif
742             }   
743         } else {
744             /* an pdp_st has occurred. */
745
746             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
747              * occurred up to the last run.        
748             pdp_new[] contains rate*seconds from the latest run.
749             pdp_temp[] will contain the rate for cdp */
750
751             for(i=0;i<rrd.stat_head->ds_cnt;i++){
752                 /* update pdp_prep to the current pdp_st */
753                 if(isnan(pdp_new[i]))
754                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
755                 else
756                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
757                         pdp_new[i]/(double)interval*(double)pre_int;
758
759                 /* if too much of the pdp_prep is unknown we dump it */
760                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
761                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
762                     (occu_pdp_st-proc_pdp_st <= 
763                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
764                     pdp_temp[i] = DNAN;
765                 } else {
766                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
767                         / (double)( occu_pdp_st
768                                    - proc_pdp_st
769                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
770                 }
771
772                 /* process CDEF data sources; remember each CDEF DS can
773                  * only reference other DS with a lower index number */
774             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
775                    rpnp_t *rpnp;
776                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
777                    /* substitue data values for OP_VARIABLE nodes */
778                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
779                    {
780                           if (rpnp[ii].op == OP_VARIABLE) {
781                                  rpnp[ii].op = OP_NUMBER;
782                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
783                           }
784                    }
785                    /* run the rpn calculator */
786                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
787                           free(rpnp);
788                           break; /* exits the data sources pdp_temp loop */
789                    }
790                 }
791         
792                 /* make pdp_prep ready for the next run */
793                 if(isnan(pdp_new[i])){
794                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
795                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
796                 } else {
797                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
798                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
799                         pdp_new[i]/(double)interval*(double)post_int;
800                 }
801
802 #ifdef DEBUG
803                 fprintf(stderr,
804                         "PDP UPD ds[%lu]\t"
805                         "pdp_temp %10.2f\t"
806                         "new_prep %10.2f\t"
807                         "new_unkn_sec %5lu\n",
808                         i, pdp_temp[i],
809                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
810                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
811 #endif
812             }
813
814                 /* if there were errors during the last loop, bail out here */
815             if (rrd_test_error()){
816                free(step_start);
817                break;
818             }
819
820                 /* compute the number of elapsed pdp_st moments */
821                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
822 #ifdef DEBUG
823                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
824 #endif
825                 if (rra_step_cnt == NULL)
826                 {
827                    rra_step_cnt = (unsigned long *) 
828                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
829                 }
830
831             for(i = 0, rra_start = rra_begin;
832                 i < rrd.stat_head->rra_cnt;
833             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
834                 i++)
835                 {
836                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
837                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
838                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
839         if (start_pdp_offset <= elapsed_pdp_st) {
840            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
841                       rrd.rra_def[i].pdp_cnt + 1;
842             } else {
843                    rra_step_cnt[i] = 0;
844                 }
845
846                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
847                 {
848                    /* If this is a bulk update, we need to skip ahead in the seasonal
849                         * arrays so that they will be correct for the next observed value;
850                         * note that for the bulk update itself, no update will occur to
851                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
852                         * be set to DNAN. */
853            if (rra_step_cnt[i] > 2) 
854                    {
855                           /* skip update by resetting rra_step_cnt[i],
856                            * note that this is not data source specific; this is due
857                            * to the bulk update, not a DNAN value for the specific data
858                            * source. */
859                           rra_step_cnt[i] = 0;
860               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
861                              &last_seasonal_coef);
862                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
863                              &seasonal_coef);
864                    }
865                 
866                   /* periodically run a smoother for seasonal effects */
867                   /* Need to use first cdp parameter buffer to track
868                    * burnin (burnin requires a specific smoothing schedule).
869                    * The CDP_init_seasonal parameter is really an RRA level,
870                    * not a data source within RRA level parameter, but the rra_def
871                    * is read only for rrd_update (not flushed to disk). */
872                   iii = i*(rrd.stat_head -> ds_cnt);
873                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
874                           <= BURNIN_CYCLES)
875                   {
876                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
877                                  > rrd.rra_def[i].row_cnt - 1) {
878                            /* mark off one of the burnin cycles */
879                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
880                        schedule_smooth = 1;
881                          }  
882                   } else {
883                          /* someone has no doubt invented a trick to deal with this
884                           * wrap around, but at least this code is clear. */
885                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
886                              rrd.rra_ptr[i].cur_row)
887                          {
888                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
889                                   * mapping between PDP and CDP */
890                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
891                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
892                                  {
893 #ifdef DEBUG
894                                         fprintf(stderr,
895                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
896                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
897                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
898 #endif
899                                         schedule_smooth = 1;
900                                  }
901              } else {
902                                  /* can't rely on negative numbers because we are working with
903                                   * unsigned values */
904                                  /* Don't need modulus here. If we've wrapped more than once, only
905                                   * one smooth is executed at the end. */
906                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
907                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
908                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
909                                  {
910 #ifdef DEBUG
911                                         fprintf(stderr,
912                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
913                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
914                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
915 #endif
916                                         schedule_smooth = 1;
917                                  }
918                          }
919                   }
920
921               rra_current = ftell(rrd_file); 
922                 } /* if cf is DEVSEASONAL or SEASONAL */
923
924         if (rrd_test_error()) break;
925
926                     /* update CDP_PREP areas */
927                     /* loop over data soures within each RRA */
928                     for(ii = 0;
929                         ii < rrd.stat_head->ds_cnt;
930                         ii++)
931                         {
932                         
933                         /* iii indexes the CDP prep area for this data source within the RRA */
934                         iii=i*rrd.stat_head->ds_cnt+ii;
935
936                         if (rrd.rra_def[i].pdp_cnt > 1) {
937                           
938                            if (rra_step_cnt[i] > 0) {
939                            /* If we are in this block, as least 1 CDP value will be written to
940                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
941                                 * to be written, then the "fill in" value is the CDP_secondary_val
942                                 * entry. */
943                                   if (isnan(pdp_temp[ii]))
944                   {
945                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
946                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
947                                   } else {
948                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
949                                           * CDP data entries. No matter the CF, the value is the same because
950                                           * the average, max, min, and last of a list of identical values is
951                                           * the same, namely, the value itself. */
952                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
953                                   }
954                      
955                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
956                                       > rrd.rra_def[i].pdp_cnt*
957                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
958                                   {
959                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
960                                          /* initialize carry over */
961                                          if (current_cf == CF_AVERAGE) {
962                                                    if (isnan(pdp_temp[ii])) { 
963                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
964                                                    } else {
965                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
966                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
967                                                    }
968                                          } else {
969                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
970                                          }
971                                   } else {
972                                          rrd_value_t cum_val, cur_val; 
973                                      switch (current_cf) {
974                                                 case CF_AVERAGE:
975                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
976                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
977                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
978                                                (cum_val + cur_val * start_pdp_offset) /
979                                            (rrd.rra_def[i].pdp_cnt
980                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
981                                                    /* initialize carry over value */
982                                                    if (isnan(pdp_temp[ii])) { 
983                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
984                                                    } else {
985                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
986                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
987                                                    }
988                                                    break;
989                                                 case CF_MAXIMUM:
990                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
991                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
992 #ifdef DEBUG
993                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
994                                                           isnan(pdp_temp[ii])) {
995                                                      fprintf(stderr,
996                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
997                                                                 i,ii);
998                                                          exit(-1);
999                                                   }
1000 #endif
1001                                                   if (cur_val > cum_val)
1002                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1003                                                   else
1004                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1005                                                   /* initialize carry over value */
1006                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1007                                                   break;
1008                                                 case CF_MINIMUM:
1009                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1010                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1011 #ifdef DEBUG
1012                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1013                                                           isnan(pdp_temp[ii])) {
1014                                                      fprintf(stderr,
1015                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1016                                                                 i,ii);
1017                                                          exit(-1);
1018                                                   }
1019 #endif
1020                                                   if (cur_val < cum_val)
1021                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1022                                                   else
1023                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1024                                                   /* initialize carry over value */
1025                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1026                                                   break;
1027                                                 case CF_LAST:
1028                                                 default:
1029                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1030                                                    /* initialize carry over value */
1031                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1032                                                 break;
1033                                          }
1034                                   } /* endif meets xff value requirement for a valid value */
1035                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1036                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1037                                   if (isnan(pdp_temp[ii]))
1038                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1039                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1040                                   else
1041                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1042                } else  /* rra_step_cnt[i]  == 0 */
1043                            {
1044 #ifdef DEBUG
1045                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1046                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1047                                          i,ii);
1048                                   } else {
1049                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1050                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1051                                   }
1052 #endif
1053                                   if (isnan(pdp_temp[ii])) {
1054                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1055                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1056                                   {
1057                                          if (current_cf == CF_AVERAGE) {
1058                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1059                                                    elapsed_pdp_st;
1060                                          } else {
1061                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1062                                          }
1063 #ifdef DEBUG
1064                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1065                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1066 #endif
1067                                   } else {
1068                                          switch (current_cf) {
1069                                          case CF_AVERAGE:
1070                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1071                                                    elapsed_pdp_st;
1072                                                 break;
1073                                          case CF_MINIMUM:
1074                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1075                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1076                                                 break; 
1077                                          case CF_MAXIMUM:
1078                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1079                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1080                                                 break; 
1081                                          case CF_LAST:
1082                                          default:
1083                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1084                                                 break;
1085                                          }
1086                                   }
1087                            }
1088                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1089                            if (elapsed_pdp_st > 2)
1090                            {
1091                                    switch (current_cf) {
1092                                    case CF_AVERAGE:
1093                                    default:
1094                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1095                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1096                                           break;
1097                    case CF_SEASONAL:
1098                                    case CF_DEVSEASONAL:
1099                                           /* need to update cached seasonal values, so they are consistent
1100                                            * with the bulk update */
1101                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1102                                            * CDP_last_deviation are the same. */
1103                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1104                                                  last_seasonal_coef[ii];
1105                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1106                                                  seasonal_coef[ii];
1107                                           break;
1108                    case CF_HWPREDICT:
1109                                           /* need to update the null_count and last_null_count.
1110                                            * even do this for non-DNAN pdp_temp because the
1111                                            * algorithm is not learning from batch updates. */
1112                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1113                                                  elapsed_pdp_st;
1114                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1115                                                  elapsed_pdp_st - 1;
1116                                           /* fall through */
1117                                    case CF_DEVPREDICT:
1118                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1119                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1120                                           break;
1121                    case CF_FAILURES:
1122                                           /* do not count missed bulk values as failures */
1123                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1124                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1125                                           /* need to reset violations buffer.
1126                                            * could do this more carefully, but for now, just
1127                                            * assume a bulk update wipes away all violations. */
1128                       erase_violations(&rrd, iii, i);
1129                                           break;
1130                                    }
1131                            } 
1132                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1133
1134                         if (rrd_test_error()) break;
1135
1136                         } /* endif data sources loop */
1137         } /* end RRA Loop */
1138
1139                 /* this loop is only entered if elapsed_pdp_st < 3 */
1140                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1141                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1142                 {
1143                for(i = 0, rra_start = rra_begin;
1144                    i < rrd.stat_head->rra_cnt;
1145                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1146                    i++)
1147                    {
1148                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1149
1150                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1151                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1152                           {
1153                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1154                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1155                                 &seasonal_coef);
1156                  rra_current = ftell(rrd_file);
1157                           }
1158                           if (rrd_test_error()) break;
1159                       /* loop over data soures within each RRA */
1160                       for(ii = 0;
1161                           ii < rrd.stat_head->ds_cnt;
1162                           ii++)
1163                           {
1164                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1165                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1166                                     scratch_idx, seasonal_coef);
1167                           }
1168            } /* end RRA Loop */
1169                    if (rrd_test_error()) break;
1170             } /* end elapsed_pdp_st loop */
1171
1172                 if (rrd_test_error()) break;
1173
1174                 /* Ready to write to disk */
1175                 /* Move sequentially through the file, writing one RRA at a time.
1176                  * Note this architecture divorces the computation of CDP with
1177                  * flushing updated RRA entries to disk. */
1178             for(i = 0, rra_start = rra_begin;
1179                 i < rrd.stat_head->rra_cnt;
1180             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1181                 i++) {
1182                 /* is there anything to write for this RRA? If not, continue. */
1183         if (rra_step_cnt[i] == 0) continue;
1184
1185                 /* write the first row */
1186 #ifdef DEBUG
1187         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1188 #endif
1189             rrd.rra_ptr[i].cur_row++;
1190             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1191                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1192                 /* positition on the first row */
1193                 rra_pos_tmp = rra_start +
1194                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1195                 if(rra_pos_tmp != rra_current) {
1196                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1197                       rrd_set_error("seek error in rrd");
1198                       break;
1199                    }
1200                    rra_current = rra_pos_tmp;
1201                 }
1202
1203 #ifdef DEBUG
1204             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1205 #endif
1206                 scratch_idx = CDP_primary_val;
1207                 if (pcdp_summary != NULL)
1208                 {
1209                    rra_time = (current_time - current_time 
1210                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1211                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1212                 }
1213                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1214                    pcdp_summary, &rra_time);
1215                 if (rrd_test_error()) break;
1216
1217                 /* write other rows of the bulk update, if any */
1218                 scratch_idx = CDP_secondary_val;
1219                 for ( ; rra_step_cnt[i] > 1; 
1220                      rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1221                 {
1222                    if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1223                    {
1224 #ifdef DEBUG
1225               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1226                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1227 #endif
1228                           /* wrap */
1229                           rrd.rra_ptr[i].cur_row = 0;
1230                           /* seek back to beginning of current rra */
1231                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1232                           {
1233                          rrd_set_error("seek error in rrd");
1234                          break;
1235                           }
1236 #ifdef DEBUG
1237                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1238 #endif
1239                           rra_current = rra_start;
1240                    }
1241                    if (pcdp_summary != NULL)
1242                    {
1243                       rra_time = (current_time - current_time 
1244                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1245                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1246                    }
1247                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1248                       pcdp_summary, &rra_time);
1249                 }
1250                 
1251                 if (rrd_test_error())
1252                   break;
1253                 } /* RRA LOOP */
1254
1255             /* break out of the argument parsing loop if error_string is set */
1256             if (rrd_test_error()){
1257                    free(step_start);
1258                    break;
1259             } 
1260             
1261         } /* endif a pdp_st has occurred */ 
1262         rrd.live_head->last_up = current_time;
1263         rrd.live_head->last_up_usec = current_time_usec; 
1264         free(step_start);
1265     } /* function argument loop */
1266
1267     if (seasonal_coef != NULL) free(seasonal_coef);
1268     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1269         if (rra_step_cnt != NULL) free(rra_step_cnt);
1270     rpnstack_free(&rpnstack);
1271
1272     /* if we got here and if there is an error and if the file has not been
1273      * written to, then close things up and return. */
1274     if (rrd_test_error()) {
1275         free(updvals);
1276         free(tmpl_idx);
1277         rrd_free(&rrd);
1278         free(pdp_temp);
1279         free(pdp_new);
1280         fclose(rrd_file);
1281         return(-1);
1282     }
1283
1284     /* aargh ... that was tough ... so many loops ... anyway, its done.
1285      * we just need to write back the live header portion now*/
1286
1287     if (fseek(rrd_file, (sizeof(stat_head_t)
1288                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1289                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1290               SEEK_SET) != 0) {
1291         rrd_set_error("seek rrd for live header writeback");
1292         free(updvals);
1293         free(tmpl_idx);
1294         rrd_free(&rrd);
1295         free(pdp_temp);
1296         free(pdp_new);
1297         fclose(rrd_file);
1298         return(-1);
1299     }
1300
1301     if(version >= 3) {
1302             if(fwrite( rrd.live_head,
1303                        sizeof(live_head_t), 1, rrd_file) != 1){
1304                 rrd_set_error("fwrite live_head to rrd");
1305                 free(updvals);
1306                 rrd_free(&rrd);
1307                 free(tmpl_idx);
1308                 free(pdp_temp);
1309                 free(pdp_new);
1310                 fclose(rrd_file);
1311                 return(-1);
1312             }
1313     }
1314     else {
1315             if(fwrite( &rrd.live_head->last_up,
1316                        sizeof(time_t), 1, rrd_file) != 1){
1317                 rrd_set_error("fwrite live_head to rrd");
1318                 free(updvals);
1319                 rrd_free(&rrd);
1320                 free(tmpl_idx);
1321                 free(pdp_temp);
1322                 free(pdp_new);
1323                 fclose(rrd_file);
1324                 return(-1);
1325             }
1326     }
1327             
1328
1329     if(fwrite( rrd.pdp_prep,
1330                sizeof(pdp_prep_t),
1331                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1332         rrd_set_error("ftwrite pdp_prep to rrd");
1333         free(updvals);
1334         rrd_free(&rrd);
1335         free(tmpl_idx);
1336         free(pdp_temp);
1337         free(pdp_new);
1338         fclose(rrd_file);
1339         return(-1);
1340     }
1341
1342     if(fwrite( rrd.cdp_prep,
1343                sizeof(cdp_prep_t),
1344                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1345        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1346
1347         rrd_set_error("ftwrite cdp_prep to rrd");
1348         free(updvals);
1349         free(tmpl_idx);
1350         rrd_free(&rrd);
1351         free(pdp_temp);
1352         free(pdp_new);
1353         fclose(rrd_file);
1354         return(-1);
1355     }
1356
1357     if(fwrite( rrd.rra_ptr,
1358                sizeof(rra_ptr_t), 
1359                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1360         rrd_set_error("fwrite rra_ptr to rrd");
1361         free(updvals);
1362         free(tmpl_idx);
1363         rrd_free(&rrd);
1364         free(pdp_temp);
1365         free(pdp_new);
1366         fclose(rrd_file);
1367         return(-1);
1368     }
1369
1370     /* OK now close the files and free the memory */
1371     if(fclose(rrd_file) != 0){
1372         rrd_set_error("closing rrd");
1373         free(updvals);
1374         free(tmpl_idx);
1375         rrd_free(&rrd);
1376         free(pdp_temp);
1377         free(pdp_new);
1378         return(-1);
1379     }
1380
1381     /* calling the smoothing code here guarantees at most
1382          * one smoothing operation per rrd_update call. Unfortunately,
1383          * it is possible with bulk updates, or a long-delayed update
1384          * for smoothing to occur off-schedule. This really isn't
1385          * critical except during the burning cycles. */
1386         if (schedule_smooth)
1387         {
1388 #ifndef WIN32
1389           rrd_file = fopen(filename,"r+");
1390 #else
1391           rrd_file = fopen(filename,"rb+");
1392 #endif
1393           rra_start = rra_begin;
1394           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1395           {
1396             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1397                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1398             {
1399 #ifdef DEBUG
1400               fprintf(stderr,"Running smoother for rra %ld\n",i);
1401 #endif
1402               apply_smoother(&rrd,i,rra_start,rrd_file);
1403               if (rrd_test_error())
1404                 break;
1405             }
1406             rra_start += rrd.rra_def[i].row_cnt
1407               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1408           }
1409           fclose(rrd_file);
1410         }
1411     rrd_free(&rrd);
1412     free(updvals);
1413     free(tmpl_idx);
1414     free(pdp_new);
1415     free(pdp_temp);
1416     return(0);
1417 }
1418
1419 /*
1420  * get exclusive lock to whole file.
1421  * lock gets removed when we close the file
1422  *
1423  * returns 0 on success
1424  */
1425 int
1426 LockRRD(FILE *rrdfile)
1427 {
1428     int rrd_fd;         /* File descriptor for RRD */
1429     int                 stat;
1430
1431     rrd_fd = fileno(rrdfile);
1432
1433         {
1434 #ifndef WIN32    
1435                 struct flock    lock;
1436     lock.l_type = F_WRLCK;    /* exclusive write lock */
1437     lock.l_len = 0;           /* whole file */
1438     lock.l_start = 0;         /* start of file */
1439     lock.l_whence = SEEK_SET;   /* end of file */
1440
1441     stat = fcntl(rrd_fd, F_SETLK, &lock);
1442 #else
1443                 struct _stat st;
1444
1445                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1446                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1447                 } else {
1448                         stat = -1;
1449                 }
1450 #endif
1451         }
1452
1453     return(stat);
1454 }
1455
1456
1457 info_t
1458 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1459                unsigned short CDP_scratch_idx, FILE *rrd_file,
1460                    info_t *pcdp_summary, time_t *rra_time)
1461 {
1462    unsigned long ds_idx, cdp_idx;
1463    infoval iv;
1464   
1465    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1466    {
1467       /* compute the cdp index */
1468       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1469 #ifdef DEBUG
1470           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1471              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1472              rrd -> rra_def[rra_idx].cf_nam);
1473 #endif 
1474       if (pcdp_summary != NULL)
1475           {
1476              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1477              /* append info to the return hash */
1478                  pcdp_summary = info_push(pcdp_summary,
1479                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1480                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1481                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1482          RD_I_VAL, iv);
1483           }
1484           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1485                  sizeof(rrd_value_t),1,rrd_file) != 1)
1486           { 
1487              rrd_set_error("writing rrd");
1488              return 0;
1489           }
1490           *rra_current += sizeof(rrd_value_t);
1491         }
1492         return (pcdp_summary);
1493 }