replaced time_value with rrd_time_value as MacOS X introduced a struct of that name...
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.1.x  Copyright Tobias Oetiker, 1997 - 2002
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.14  2003/11/11 19:46:21  oetiker
9  * replaced time_value with rrd_time_value as MacOS X introduced a struct of that name in their standard headers
10  *
11  * Revision 1.13  2003/11/11 19:38:03  oetiker
12  * rrd files should NOT change size ever ... bulk update code wa buggy.
13  * -- David M. Grimes <dgrimes@navisite.com>
14  *
15  * Revision 1.12  2003/09/04 13:16:12  oetiker
16  * should not assigne but compare ... grrrrr
17  *
18  * Revision 1.11  2003/09/02 21:58:35  oetiker
19  * be pickier about what we accept in rrd_update. Complain if things do not work out
20  *
21  * Revision 1.10  2003/04/29 19:14:12  jake
22  * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
23  * Also revert accidental addition of -I to aclocal MakeMakefile.
24  *
25  * Revision 1.9  2003/04/25 18:35:08  jake
26  * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
27  *
28  * Revision 1.8  2003/03/31 21:22:12  oetiker
29  * enables RRDtool updates with microsecond or in case of windows millisecond
30  * precision. This is needed to reduce time measurement error when archive step
31  * is small. (<30s) --  Sasha Mikheev <sasha@avalon-net.co.il>
32  *
33  * Revision 1.7  2003/02/13 07:05:27  oetiker
34  * Find attached the patch I promised to send to you. Please note that there
35  * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
36  * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
37  * library is identical to librrd, but it contains support code for per-thread
38  * global variables currently used for error information only. This is similar
39  * to how errno per-thread variables are implemented.  librrd_th must be linked
40  * alongside of libpthred
41  *
42  * There is also a new file "THREADS", holding some documentation.
43  *
44  * -- Peter Stamfest <peter@stamfest.at>
45  *
46  * Revision 1.6  2002/02/01 20:34:49  oetiker
47  * fixed version number and date/time
48  *
49  * Revision 1.5  2001/05/09 05:31:01  oetiker
50  * Bug fix: when update of multiple PDP/CDP RRAs coincided
51  * with interpolation of multiple PDPs an incorrect value was
52  * stored as the CDP. Especially evident for GAUGE data sources.
53  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
54  *
55  * Revision 1.4  2001/03/10 23:54:41  oetiker
56  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
57  * parser and calculator from rrd_graph and puts then in a new file,
58  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
59  * clean-up of aberrant behavior stuff, including a bug fix.
60  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
61  * -- Jake Brutlag <jakeb@corp.webtv.net>
62  *
63  * Revision 1.3  2001/03/04 13:01:55  oetiker
64  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
65  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
66  * This is backwards compatible! But new files using the Aberrant stuff are not readable
67  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
68  * -- Jake Brutlag <jakeb@corp.webtv.net>
69  *
70  * Revision 1.2  2001/03/04 11:14:25  oetiker
71  * added at-style-time@value:value syntax to rrd_update
72  * --  Dave Bodenstab <imdave@mcs.net>
73  *
74  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
75  * checkin
76  *
77  *****************************************************************************/
78
79 #include "rrd_tool.h"
80 #include <sys/types.h>
81 #include <fcntl.h>
82
83 #ifdef WIN32
84  #include <sys/locking.h>
85  #include <sys/stat.h>
86  #include <io.h>
87 #endif
88
89 #include "rrd_hw.h"
90 #include "rrd_rpncalc.h"
91
92 #include "rrd_is_thread_safe.h"
93
94 #ifdef WIN32
95 /*
96  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
97  * replacement.
98  */
99 #include <sys/timeb.h>
100
101 struct timeval {
102         time_t tv_sec; /* seconds */
103         long tv_usec;  /* microseconds */
104 };
105
106 struct __timezone {
107         int  tz_minuteswest; /* minutes W of Greenwich */
108         int  tz_dsttime;     /* type of dst correction */
109 };
110
111 static gettimeofday(struct timeval *t, struct __timezone *tz) {
112         
113         struct timeb current_time;
114
115         _ftime(&current_time);
116         
117         t->tv_sec  = current_time.time;
118         t->tv_usec = current_time.millitm * 1000;
119 }
120
121 #endif
122 /*
123  * normilize time as returned by gettimeofday. usec part must
124  * be always >= 0
125  */
126 static void normalize_time(struct timeval *t)
127 {
128         if(t->tv_usec < 0) {
129                 t->tv_sec--;
130                 t->tv_usec += 1000000L;
131         }
132 }
133
134 /* Local prototypes */
135 int LockRRD(FILE *rrd_file);
136 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
137                                         unsigned long *rra_current,
138                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
139                                         info_t *pcdp_summary, time_t *rra_time);
140 int rrd_update_r(char *filename, char *template, int argc, char **argv);
141 int _rrd_update(char *filename, char *template, int argc, char **argv, 
142                                         info_t*);
143
144 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
145
146
147 #ifdef STANDALONE
148 int 
149 main(int argc, char **argv){
150         rrd_update(argc,argv);
151         if (rrd_test_error()) {
152                 printf("RRDtool 1.1.x  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
153                         "Usage: rrdupdate filename\n"
154                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
155                         "\t\t\ttime|N:value[:value...]\n\n"
156                         "\t\t\tat-time@value[:value...]\n\n"
157                         "\t\t\t[ time:value[:value...] ..]\n\n");
158                                    
159                 printf("ERROR: %s\n",rrd_get_error());
160                 rrd_clear_error();                                                            
161                 return 1;
162         }
163         return 0;
164 }
165 #endif
166
167 info_t *rrd_update_v(int argc, char **argv)
168 {
169     char             *template = NULL;          
170         info_t *result = NULL;
171         infoval rc;
172
173     while (1) {
174                 static struct option long_options[] =
175                         {
176                                 {"template",      required_argument, 0, 't'},
177                                 {0,0,0,0}
178                         };
179                 int option_index = 0;
180                 int opt;
181                 opt = getopt_long(argc, argv, "t:", 
182                                                   long_options, &option_index);
183                 
184                 if (opt == EOF)
185                         break;
186                 
187                 switch(opt) {
188                 case 't':
189                         template = optarg;
190                         break;
191                 
192                 case '?':
193                         rrd_set_error("unknown option '%s'",argv[optind-1]);
194             rc.u_int = -1;
195                         goto end_tag;
196                 }
197     }
198
199     /* need at least 2 arguments: filename, data. */
200     if (argc-optind < 2) {
201                 rrd_set_error("Not enough arguments");
202         rc.u_int = -1;
203                 goto end_tag;
204     }
205     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
206         rc.u_int = _rrd_update(argv[optind], template,
207                       argc - optind - 1, argv + optind + 1, result);
208     result->value.u_int = rc.u_int;
209 end_tag:
210     return result;
211 }
212
213 int
214 rrd_update(int argc, char **argv)
215 {
216     char             *template = NULL;          
217     int rc;
218
219     while (1) {
220                 static struct option long_options[] =
221                         {
222                                 {"template",      required_argument, 0, 't'},
223                                 {0,0,0,0}
224                         };
225                 int option_index = 0;
226                 int opt;
227                 opt = getopt_long(argc, argv, "t:", 
228                                                   long_options, &option_index);
229                 
230                 if (opt == EOF)
231                         break;
232                 
233                 switch(opt) {
234                 case 't':
235                         template = optarg;
236                         break;
237                 
238                 case '?':
239                         rrd_set_error("unknown option '%s'",argv[optind-1]);
240                         return(-1);
241                 }
242     }
243
244     /* need at least 2 arguments: filename, data. */
245     if (argc-optind < 2) {
246                 rrd_set_error("Not enough arguments");
247
248                 return -1;
249     }
250  
251         rc = rrd_update_r(argv[optind], template,
252                       argc - optind - 1, argv + optind + 1);
253     return rc;
254 }
255
256 int
257 rrd_update_r(char *filename, char *template, int argc, char **argv)
258 {
259    return _rrd_update(filename, template, argc, argv, NULL);
260 }
261
262 int
263 _rrd_update(char *filename, char *template, int argc, char **argv, 
264    info_t *pcdp_summary)
265 {
266
267     int              arg_i = 2;
268     short            j;
269     unsigned long    i,ii,iii=1;
270
271     unsigned long    rra_begin;          /* byte pointer to the rra
272                                           * area in the rrd file.  this
273                                           * pointer never changes value */
274     unsigned long    rra_start;          /* byte pointer to the rra
275                                           * area in the rrd file.  this
276                                           * pointer changes as each rrd is
277                                           * processed. */
278     unsigned long    rra_current;        /* byte pointer to the current write
279                                           * spot in the rrd file. */
280     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
281     double           interval,
282         pre_int,post_int;                /* interval between this and
283                                           * the last run */
284     unsigned long    proc_pdp_st;        /* which pdp_st was the last
285                                           * to be processed */
286     unsigned long    occu_pdp_st;        /* when was the pdp_st
287                                           * before the last update
288                                           * time */
289     unsigned long    proc_pdp_age;       /* how old was the data in
290                                           * the pdp prep area when it
291                                           * was last updated */
292     unsigned long    occu_pdp_age;       /* how long ago was the last
293                                           * pdp_step time */
294     rrd_value_t      *pdp_new;           /* prepare the incoming data
295                                           * to be added the the
296                                           * existing entry */
297     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
298                                           * to be added the the
299                                           * cdp values */
300
301     long             *tmpl_idx;          /* index representing the settings
302                                             transported by the template index */
303     unsigned long    tmpl_cnt = 2;       /* time and data */
304
305     FILE             *rrd_file;
306     rrd_t            rrd;
307     time_t           current_time;
308         time_t           rra_time; /* time of update for a RRA */
309     unsigned long    current_time_usec;  /* microseconds part of current time */
310     struct timeval   tmp_time;           /* used for time conversion */
311
312     char             **updvals;
313     int              schedule_smooth = 0;
314         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
315                                          /* a vector of future Holt-Winters seasonal coefs */
316     unsigned long    elapsed_pdp_st;
317                                          /* number of elapsed PDP steps since last update */
318     unsigned long    *rra_step_cnt = NULL;
319                                          /* number of rows to be updated in an RRA for a data
320                                           * value. */
321     unsigned long    start_pdp_offset;
322                                          /* number of PDP steps since the last update that
323                                           * are assigned to the first CDP to be generated
324                                           * since the last update. */
325     unsigned short   scratch_idx;
326                                          /* index into the CDP scratch array */
327     enum cf_en       current_cf;
328                                          /* numeric id of the current consolidation function */
329     rpnstack_t       rpnstack; /* used for COMPUTE DS */
330     int              version;  /* rrd version */
331     char             *endptr; /* used in the conversion */
332
333     rpnstack_init(&rpnstack);
334
335     /* need at least 1 arguments: data. */
336     if (argc < 1) {
337         rrd_set_error("Not enough arguments");
338         return -1;
339     }
340     
341     
342
343     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
344         return -1;
345     }
346     /* initialize time */
347     version = atoi(rrd.stat_head->version);
348     gettimeofday(&tmp_time, 0);
349     normalize_time(&tmp_time);
350     current_time = tmp_time.tv_sec;
351     if(version >= 3) {
352         current_time_usec = tmp_time.tv_usec;
353     }
354     else {
355         current_time_usec = 0;
356     }
357
358     rra_current = rra_start = rra_begin = ftell(rrd_file);
359     /* This is defined in the ANSI C standard, section 7.9.5.3:
360
361         When a file is opened with udpate mode ('+' as the second
362         or third character in the ... list of mode argument
363         variables), both input and ouptut may be performed on the
364         associated stream.  However, ...  input may not be directly
365         followed by output without an intervening call to a file
366         positioning function, unless the input oepration encounters
367         end-of-file. */
368     fseek(rrd_file, 0, SEEK_CUR);
369
370     
371     /* get exclusive lock to whole file.
372      * lock gets removed when we close the file.
373      */
374     if (LockRRD(rrd_file) != 0) {
375       rrd_set_error("could not lock RRD");
376       rrd_free(&rrd);
377       fclose(rrd_file);
378       return(-1);   
379     } 
380
381     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
382         rrd_set_error("allocating updvals pointer array");
383         rrd_free(&rrd);
384         fclose(rrd_file);
385         return(-1);
386     }
387
388     if ((pdp_temp = malloc(sizeof(rrd_value_t)
389                            *rrd.stat_head->ds_cnt))==NULL){
390         rrd_set_error("allocating pdp_temp ...");
391         free(updvals);
392         rrd_free(&rrd);
393         fclose(rrd_file);
394         return(-1);
395     }
396
397     if ((tmpl_idx = malloc(sizeof(unsigned long)
398                            *(rrd.stat_head->ds_cnt+1)))==NULL){
399         rrd_set_error("allocating tmpl_idx ...");
400         free(pdp_temp);
401         free(updvals);
402         rrd_free(&rrd);
403         fclose(rrd_file);
404         return(-1);
405     }
406     /* initialize template redirector */
407     /* default config example (assume DS 1 is a CDEF DS)
408        tmpl_idx[0] -> 0; (time)
409        tmpl_idx[1] -> 1; (DS 0)
410        tmpl_idx[2] -> 3; (DS 2)
411        tmpl_idx[3] -> 4; (DS 3) */
412     tmpl_idx[0] = 0; /* time */
413     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
414         {
415            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
416               tmpl_idx[ii++]=i;
417         }
418     tmpl_cnt= ii;
419
420     if (template) {
421         char *dsname;
422         unsigned int tmpl_len;
423         dsname = template;
424         tmpl_cnt = 1; /* the first entry is the time */
425         tmpl_len = strlen(template);
426         for(i=0;i<=tmpl_len ;i++) {
427             if (template[i] == ':' || template[i] == '\0') {
428                 template[i] = '\0';
429                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
430                     rrd_set_error("Template contains more DS definitions than RRD");
431                     free(updvals); free(pdp_temp);
432                     free(tmpl_idx); rrd_free(&rrd);
433                     fclose(rrd_file); return(-1);
434                 }
435                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
436                     rrd_set_error("unknown DS name '%s'",dsname);
437                     free(updvals); free(pdp_temp);
438                     free(tmpl_idx); rrd_free(&rrd);
439                     fclose(rrd_file); return(-1);
440                 } else {
441                   /* the first element is always the time */
442                   tmpl_idx[tmpl_cnt-1]++; 
443                   /* go to the next entry on the template */
444                   dsname = &template[i+1];
445                   /* fix the damage we did before */
446                   if (i<tmpl_len) {
447                      template[i]=':';
448                   } 
449
450                 }
451             }       
452         }
453     }
454     if ((pdp_new = malloc(sizeof(rrd_value_t)
455                           *rrd.stat_head->ds_cnt))==NULL){
456         rrd_set_error("allocating pdp_new ...");
457         free(updvals);
458         free(pdp_temp);
459         free(tmpl_idx);
460         rrd_free(&rrd);
461         fclose(rrd_file);
462         return(-1);
463     }
464
465     /* loop through the arguments. */
466     for(arg_i=0; arg_i<argc;arg_i++) {
467         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
468         char *step_start = stepper;
469         char *p;
470         char *parsetime_error = NULL;
471         enum {atstyle, normal} timesyntax;
472         struct rrd_time_value ds_tv;
473         if (stepper == NULL){
474                 rrd_set_error("failed duplication argv entry");
475                 free(updvals);
476                 free(pdp_temp);  
477                 free(tmpl_idx);
478                 rrd_free(&rrd);
479                 fclose(rrd_file);
480                 return(-1);
481          }
482         /* initialize all ds input to unknown except the first one
483            which has always got to be set */
484         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
485         strcpy(stepper,argv[arg_i]);
486         updvals[0]=stepper;
487         /* separate all ds elements; first must be examined separately
488            due to alternate time syntax */
489         if ((p=strchr(stepper,'@'))!=NULL) {
490             timesyntax = atstyle;
491             *p = '\0';
492             stepper = p+1;
493         } else if ((p=strchr(stepper,':'))!=NULL) {
494             timesyntax = normal;
495             *p = '\0';
496             stepper = p+1;
497         } else {
498             rrd_set_error("expected timestamp not found in data source from %s:...",
499                           argv[arg_i]);
500             free(step_start);
501             break;
502         }
503         ii=1;
504         updvals[tmpl_idx[ii]] = stepper;
505         while (*stepper) {
506             if (*stepper == ':') {
507                 *stepper = '\0';
508                 ii++;
509                 if (ii<tmpl_cnt){                   
510                     updvals[tmpl_idx[ii]] = stepper+1;
511                 }
512             }
513             stepper++;
514         }
515
516         if (ii != tmpl_cnt-1) {
517             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
518                           tmpl_cnt-1, ii, argv[arg_i]);
519             free(step_start);
520             break;
521         }
522         
523         /* get the time from the reading ... handle N */
524         if (timesyntax == atstyle) {
525             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
526                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
527                 free(step_start);
528                 break;
529             }
530             if (ds_tv.type == RELATIVE_TO_END_TIME ||
531                 ds_tv.type == RELATIVE_TO_START_TIME) {
532                 rrd_set_error("specifying time relative to the 'start' "
533                               "or 'end' makes no sense here: %s",
534                               updvals[0]);
535                 free(step_start);
536                 break;
537             }
538
539             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
540             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
541             
542         } else if (strcmp(updvals[0],"N")==0){
543             gettimeofday(&tmp_time, 0);
544             normalize_time(&tmp_time);
545             current_time = tmp_time.tv_sec;
546             current_time_usec = tmp_time.tv_usec;
547         } else {
548             double tmp;
549             tmp = strtod(updvals[0], 0);
550             current_time = floor(tmp);
551             current_time_usec = (long)((tmp - current_time) * 1000000L);
552         }
553         /* dont do any correction for old version RRDs */
554         if(version < 3) 
555             current_time_usec = 0;
556         
557         if(current_time <= rrd.live_head->last_up){
558             rrd_set_error("illegal attempt to update using time %ld when "
559                           "last update time is %ld (minimum one second step)",
560                           current_time, rrd.live_head->last_up);
561             free(step_start);
562             break;
563         }
564         
565         
566         /* seek to the beginning of the rra's */
567         if (rra_current != rra_begin) {
568             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
569                 rrd_set_error("seek error in rrd");
570                 free(step_start);
571                 break;
572             }
573             rra_current = rra_begin;
574         }
575         rra_start = rra_begin;
576
577         /* when was the current pdp started */
578         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
579         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
580
581         /* when did the last pdp_st occur */
582         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
583         occu_pdp_st = current_time - occu_pdp_age;
584         /* interval = current_time - rrd.live_head->last_up; */
585         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
586     
587         if (occu_pdp_st > proc_pdp_st){
588             /* OK we passed the pdp_st moment*/
589             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
590                                                               * occurred before the latest
591                                                               * pdp_st moment*/
592             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
593             post_int = occu_pdp_age;                         /* how much after it */
594             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
595         } else {
596             pre_int = interval;
597             post_int = 0;
598         }
599
600 #ifdef DEBUG
601         printf(
602                "proc_pdp_age %lu\t"
603                "proc_pdp_st %lu\t" 
604                "occu_pfp_age %lu\t" 
605                "occu_pdp_st %lu\t"
606                "int %lf\t"
607                "pre_int %lf\t"
608                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
609                 occu_pdp_age, occu_pdp_st,
610                interval, pre_int, post_int);
611 #endif
612     
613         /* process the data sources and update the pdp_prep 
614          * area accordingly */
615         for(i=0;i<rrd.stat_head->ds_cnt;i++){
616             enum dst_en dst_idx;
617             dst_idx= dst_conv(rrd.ds_def[i].dst);
618                 /* NOTE: DST_CDEF should never enter this if block, because
619                  * updvals[i+1][0] is initialized to 'U'; unless the caller
620                  * accidently specified a value for the DST_CDEF. To handle 
621                  * this case, an extra check is required. */
622             if((updvals[i+1][0] != 'U') &&
623                    (dst_idx != DST_CDEF) &&
624                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
625                double rate = DNAN;
626                /* the data source type defines how to process the data */
627                 /* pdp_new contains rate * time ... eg the bytes
628                  * transferred during the interval. Doing it this way saves
629                  * a lot of math operations */
630                 
631
632                 switch(dst_idx){
633                 case DST_COUNTER:
634                 case DST_DERIVE:
635                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
636                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
637                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
638                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
639                                  break;
640                             }
641                        }
642                        if (rrd_test_error()){
643                             break;
644                        }
645                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
646                        if(dst_idx == DST_COUNTER) {
647                           /* simple overflow catcher sugestet by andres kroonmaa */
648                           /* this will fail terribly for non 32 or 64 bit counters ... */
649                           /* are there any others in SNMP land ? */
650                           if (pdp_new[i] < (double)0.0 ) 
651                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
652                           if (pdp_new[i] < (double)0.0 ) 
653                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
654                        }
655                        rate = pdp_new[i] / interval;
656                     }
657                    else {
658                      pdp_new[i]= DNAN;          
659                    }
660                    break;
661                 case DST_ABSOLUTE:
662                     errno = 0;
663                     pdp_new[i] = strtod(updvals[i+1],&endptr);
664                     if (errno > 0){
665                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
666                         break;
667                     };
668                     if (endptr[0] != '\0'){
669                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
670                         break;
671                     }
672                     rate = pdp_new[i] / interval;                 
673                     break;
674                 case DST_GAUGE:
675                     errno = 0;
676                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
677                     if (errno > 0){
678                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
679                         break;
680                     };
681                     if (endptr[0] != '\0'){
682                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
683                         break;
684                     }
685                     rate = pdp_new[i] / interval;                  
686                     break;
687                 default:
688                     rrd_set_error("rrd contains unknown DS type : '%s'",
689                                   rrd.ds_def[i].dst);
690                     break;
691                 }
692                 /* break out of this for loop if the error string is set */
693                 if (rrd_test_error()){
694                     break;
695                 }
696                /* make sure pdp_temp is neither too large or too small
697                 * if any of these occur it becomes unknown ...
698                 * sorry folks ... */
699                if ( ! isnan(rate) && 
700                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
701                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
702                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
703                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
704                   pdp_new[i] = DNAN;
705                }               
706             } else {
707                 /* no news is news all the same */
708                 pdp_new[i] = DNAN;
709             }
710             
711             /* make a copy of the command line argument for the next run */
712 #ifdef DEBUG
713             fprintf(stderr,
714                     "prep ds[%lu]\t"
715                     "last_arg '%s'\t"
716                     "this_arg '%s'\t"
717                     "pdp_new %10.2f\n",
718                     i,
719                     rrd.pdp_prep[i].last_ds,
720                     updvals[i+1], pdp_new[i]);
721 #endif
722             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
723                 strncpy(rrd.pdp_prep[i].last_ds,
724                         updvals[i+1],LAST_DS_LEN-1);
725                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
726             }
727         }
728         /* break out of the argument parsing loop if the error_string is set */
729         if (rrd_test_error()){
730             free(step_start);
731             break;
732         }
733         /* has a pdp_st moment occurred since the last run ? */
734
735         if (proc_pdp_st == occu_pdp_st){
736             /* no we have not passed a pdp_st moment. therefore update is simple */
737
738             for(i=0;i<rrd.stat_head->ds_cnt;i++){
739                 if(isnan(pdp_new[i]))
740                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
741                 else
742                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
743 #ifdef DEBUG
744                 fprintf(stderr,
745                         "NO PDP  ds[%lu]\t"
746                         "value %10.2f\t"
747                         "unkn_sec %5lu\n",
748                         i,
749                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
750                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
751 #endif
752             }   
753         } else {
754             /* an pdp_st has occurred. */
755
756             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
757              * occurred up to the last run.        
758             pdp_new[] contains rate*seconds from the latest run.
759             pdp_temp[] will contain the rate for cdp */
760
761             for(i=0;i<rrd.stat_head->ds_cnt;i++){
762                 /* update pdp_prep to the current pdp_st */
763                 if(isnan(pdp_new[i]))
764                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
765                 else
766                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
767                         pdp_new[i]/(double)interval*(double)pre_int;
768
769                 /* if too much of the pdp_prep is unknown we dump it */
770                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
771                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
772                     (occu_pdp_st-proc_pdp_st <= 
773                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
774                     pdp_temp[i] = DNAN;
775                 } else {
776                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
777                         / (double)( occu_pdp_st
778                                    - proc_pdp_st
779                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
780                 }
781
782                 /* process CDEF data sources; remember each CDEF DS can
783                  * only reference other DS with a lower index number */
784             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
785                    rpnp_t *rpnp;
786                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
787                    /* substitue data values for OP_VARIABLE nodes */
788                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
789                    {
790                           if (rpnp[ii].op == OP_VARIABLE) {
791                                  rpnp[ii].op = OP_NUMBER;
792                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
793                           }
794                    }
795                    /* run the rpn calculator */
796                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
797                           free(rpnp);
798                           break; /* exits the data sources pdp_temp loop */
799                    }
800                 }
801         
802                 /* make pdp_prep ready for the next run */
803                 if(isnan(pdp_new[i])){
804                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
805                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
806                 } else {
807                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
808                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
809                         pdp_new[i]/(double)interval*(double)post_int;
810                 }
811
812 #ifdef DEBUG
813                 fprintf(stderr,
814                         "PDP UPD ds[%lu]\t"
815                         "pdp_temp %10.2f\t"
816                         "new_prep %10.2f\t"
817                         "new_unkn_sec %5lu\n",
818                         i, pdp_temp[i],
819                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
820                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
821 #endif
822             }
823
824                 /* if there were errors during the last loop, bail out here */
825             if (rrd_test_error()){
826                free(step_start);
827                break;
828             }
829
830                 /* compute the number of elapsed pdp_st moments */
831                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
832 #ifdef DEBUG
833                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
834 #endif
835                 if (rra_step_cnt == NULL)
836                 {
837                    rra_step_cnt = (unsigned long *) 
838                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
839                 }
840
841             for(i = 0, rra_start = rra_begin;
842                 i < rrd.stat_head->rra_cnt;
843             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
844                 i++)
845                 {
846                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
847                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
848                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
849         if (start_pdp_offset <= elapsed_pdp_st) {
850            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
851                       rrd.rra_def[i].pdp_cnt + 1;
852             } else {
853                    rra_step_cnt[i] = 0;
854                 }
855
856                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
857                 {
858                    /* If this is a bulk update, we need to skip ahead in the seasonal
859                         * arrays so that they will be correct for the next observed value;
860                         * note that for the bulk update itself, no update will occur to
861                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
862                         * be set to DNAN. */
863            if (rra_step_cnt[i] > 2) 
864                    {
865                           /* skip update by resetting rra_step_cnt[i],
866                            * note that this is not data source specific; this is due
867                            * to the bulk update, not a DNAN value for the specific data
868                            * source. */
869                           rra_step_cnt[i] = 0;
870               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
871                              &last_seasonal_coef);
872                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
873                              &seasonal_coef);
874                    }
875                 
876                   /* periodically run a smoother for seasonal effects */
877                   /* Need to use first cdp parameter buffer to track
878                    * burnin (burnin requires a specific smoothing schedule).
879                    * The CDP_init_seasonal parameter is really an RRA level,
880                    * not a data source within RRA level parameter, but the rra_def
881                    * is read only for rrd_update (not flushed to disk). */
882                   iii = i*(rrd.stat_head -> ds_cnt);
883                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
884                           <= BURNIN_CYCLES)
885                   {
886                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
887                                  > rrd.rra_def[i].row_cnt - 1) {
888                            /* mark off one of the burnin cycles */
889                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
890                        schedule_smooth = 1;
891                          }  
892                   } else {
893                          /* someone has no doubt invented a trick to deal with this
894                           * wrap around, but at least this code is clear. */
895                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
896                              rrd.rra_ptr[i].cur_row)
897                          {
898                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
899                                   * mapping between PDP and CDP */
900                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
901                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
902                                  {
903 #ifdef DEBUG
904                                         fprintf(stderr,
905                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
906                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
907                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
908 #endif
909                                         schedule_smooth = 1;
910                                  }
911              } else {
912                                  /* can't rely on negative numbers because we are working with
913                                   * unsigned values */
914                                  /* Don't need modulus here. If we've wrapped more than once, only
915                                   * one smooth is executed at the end. */
916                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
917                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
918                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
919                                  {
920 #ifdef DEBUG
921                                         fprintf(stderr,
922                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
923                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
924                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
925 #endif
926                                         schedule_smooth = 1;
927                                  }
928                          }
929                   }
930
931               rra_current = ftell(rrd_file); 
932                 } /* if cf is DEVSEASONAL or SEASONAL */
933
934         if (rrd_test_error()) break;
935
936                     /* update CDP_PREP areas */
937                     /* loop over data soures within each RRA */
938                     for(ii = 0;
939                         ii < rrd.stat_head->ds_cnt;
940                         ii++)
941                         {
942                         
943                         /* iii indexes the CDP prep area for this data source within the RRA */
944                         iii=i*rrd.stat_head->ds_cnt+ii;
945
946                         if (rrd.rra_def[i].pdp_cnt > 1) {
947                           
948                            if (rra_step_cnt[i] > 0) {
949                            /* If we are in this block, as least 1 CDP value will be written to
950                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
951                                 * to be written, then the "fill in" value is the CDP_secondary_val
952                                 * entry. */
953                                   if (isnan(pdp_temp[ii]))
954                   {
955                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
956                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
957                                   } else {
958                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
959                                           * CDP data entries. No matter the CF, the value is the same because
960                                           * the average, max, min, and last of a list of identical values is
961                                           * the same, namely, the value itself. */
962                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
963                                   }
964                      
965                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
966                                       > rrd.rra_def[i].pdp_cnt*
967                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
968                                   {
969                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
970                                          /* initialize carry over */
971                                          if (current_cf == CF_AVERAGE) {
972                                                    if (isnan(pdp_temp[ii])) { 
973                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
974                                                    } else {
975                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
976                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
977                                                    }
978                                          } else {
979                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
980                                          }
981                                   } else {
982                                          rrd_value_t cum_val, cur_val; 
983                                      switch (current_cf) {
984                                                 case CF_AVERAGE:
985                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
986                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
987                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
988                                                (cum_val + cur_val * start_pdp_offset) /
989                                            (rrd.rra_def[i].pdp_cnt
990                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
991                                                    /* initialize carry over value */
992                                                    if (isnan(pdp_temp[ii])) { 
993                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
994                                                    } else {
995                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
996                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
997                                                    }
998                                                    break;
999                                                 case CF_MAXIMUM:
1000                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1001                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1002 #ifdef DEBUG
1003                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1004                                                           isnan(pdp_temp[ii])) {
1005                                                      fprintf(stderr,
1006                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1007                                                                 i,ii);
1008                                                          exit(-1);
1009                                                   }
1010 #endif
1011                                                   if (cur_val > cum_val)
1012                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1013                                                   else
1014                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1015                                                   /* initialize carry over value */
1016                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1017                                                   break;
1018                                                 case CF_MINIMUM:
1019                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1020                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1021 #ifdef DEBUG
1022                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1023                                                           isnan(pdp_temp[ii])) {
1024                                                      fprintf(stderr,
1025                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1026                                                                 i,ii);
1027                                                          exit(-1);
1028                                                   }
1029 #endif
1030                                                   if (cur_val < cum_val)
1031                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1032                                                   else
1033                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1034                                                   /* initialize carry over value */
1035                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1036                                                   break;
1037                                                 case CF_LAST:
1038                                                 default:
1039                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1040                                                    /* initialize carry over value */
1041                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1042                                                 break;
1043                                          }
1044                                   } /* endif meets xff value requirement for a valid value */
1045                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1046                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1047                                   if (isnan(pdp_temp[ii]))
1048                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1049                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1050                                   else
1051                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1052                } else  /* rra_step_cnt[i]  == 0 */
1053                            {
1054 #ifdef DEBUG
1055                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1056                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1057                                          i,ii);
1058                                   } else {
1059                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1060                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1061                                   }
1062 #endif
1063                                   if (isnan(pdp_temp[ii])) {
1064                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1065                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1066                                   {
1067                                          if (current_cf == CF_AVERAGE) {
1068                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1069                                                    elapsed_pdp_st;
1070                                          } else {
1071                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1072                                          }
1073 #ifdef DEBUG
1074                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1075                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1076 #endif
1077                                   } else {
1078                                          switch (current_cf) {
1079                                          case CF_AVERAGE:
1080                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1081                                                    elapsed_pdp_st;
1082                                                 break;
1083                                          case CF_MINIMUM:
1084                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1085                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1086                                                 break; 
1087                                          case CF_MAXIMUM:
1088                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1089                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1090                                                 break; 
1091                                          case CF_LAST:
1092                                          default:
1093                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1094                                                 break;
1095                                          }
1096                                   }
1097                            }
1098                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1099                            if (elapsed_pdp_st > 2)
1100                            {
1101                                    switch (current_cf) {
1102                                    case CF_AVERAGE:
1103                                    default:
1104                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1105                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1106                                           break;
1107                    case CF_SEASONAL:
1108                                    case CF_DEVSEASONAL:
1109                                           /* need to update cached seasonal values, so they are consistent
1110                                            * with the bulk update */
1111                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1112                                            * CDP_last_deviation are the same. */
1113                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1114                                                  last_seasonal_coef[ii];
1115                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1116                                                  seasonal_coef[ii];
1117                                           break;
1118                    case CF_HWPREDICT:
1119                                           /* need to update the null_count and last_null_count.
1120                                            * even do this for non-DNAN pdp_temp because the
1121                                            * algorithm is not learning from batch updates. */
1122                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1123                                                  elapsed_pdp_st;
1124                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1125                                                  elapsed_pdp_st - 1;
1126                                           /* fall through */
1127                                    case CF_DEVPREDICT:
1128                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1129                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1130                                           break;
1131                    case CF_FAILURES:
1132                                           /* do not count missed bulk values as failures */
1133                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1134                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1135                                           /* need to reset violations buffer.
1136                                            * could do this more carefully, but for now, just
1137                                            * assume a bulk update wipes away all violations. */
1138                       erase_violations(&rrd, iii, i);
1139                                           break;
1140                                    }
1141                            } 
1142                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1143
1144                         if (rrd_test_error()) break;
1145
1146                         } /* endif data sources loop */
1147         } /* end RRA Loop */
1148
1149                 /* this loop is only entered if elapsed_pdp_st < 3 */
1150                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1151                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1152                 {
1153                for(i = 0, rra_start = rra_begin;
1154                    i < rrd.stat_head->rra_cnt;
1155                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1156                    i++)
1157                    {
1158                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1159
1160                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1161                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1162                           {
1163                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1164                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1165                                 &seasonal_coef);
1166                  rra_current = ftell(rrd_file);
1167                           }
1168                           if (rrd_test_error()) break;
1169                       /* loop over data soures within each RRA */
1170                       for(ii = 0;
1171                           ii < rrd.stat_head->ds_cnt;
1172                           ii++)
1173                           {
1174                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1175                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1176                                     scratch_idx, seasonal_coef);
1177                           }
1178            } /* end RRA Loop */
1179                    if (rrd_test_error()) break;
1180             } /* end elapsed_pdp_st loop */
1181
1182                 if (rrd_test_error()) break;
1183
1184                 /* Ready to write to disk */
1185                 /* Move sequentially through the file, writing one RRA at a time.
1186                  * Note this architecture divorces the computation of CDP with
1187                  * flushing updated RRA entries to disk. */
1188             for(i = 0, rra_start = rra_begin;
1189                 i < rrd.stat_head->rra_cnt;
1190             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1191                 i++) {
1192                 /* is there anything to write for this RRA? If not, continue. */
1193         if (rra_step_cnt[i] == 0) continue;
1194
1195                 /* write the first row */
1196 #ifdef DEBUG
1197         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1198 #endif
1199             rrd.rra_ptr[i].cur_row++;
1200             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1201                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1202                 /* positition on the first row */
1203                 rra_pos_tmp = rra_start +
1204                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1205                 if(rra_pos_tmp != rra_current) {
1206                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1207                       rrd_set_error("seek error in rrd");
1208                       break;
1209                    }
1210                    rra_current = rra_pos_tmp;
1211                 }
1212
1213 #ifdef DEBUG
1214             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1215 #endif
1216                 scratch_idx = CDP_primary_val;
1217                 if (pcdp_summary != NULL)
1218                 {
1219                    rra_time = (current_time - current_time 
1220                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1221                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1222                 }
1223                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1224                    pcdp_summary, &rra_time);
1225                 if (rrd_test_error()) break;
1226
1227                 /* write other rows of the bulk update, if any */
1228                 scratch_idx = CDP_secondary_val;
1229                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1230                 {
1231                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1232                    {
1233 #ifdef DEBUG
1234               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1235                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1236 #endif
1237                           /* wrap */
1238                           rrd.rra_ptr[i].cur_row = 0;
1239                           /* seek back to beginning of current rra */
1240                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1241                           {
1242                          rrd_set_error("seek error in rrd");
1243                          break;
1244                           }
1245 #ifdef DEBUG
1246                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1247 #endif
1248                           rra_current = rra_start;
1249                    }
1250                    if (pcdp_summary != NULL)
1251                    {
1252                       rra_time = (current_time - current_time 
1253                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1254                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1255                    }
1256                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1257                       pcdp_summary, &rra_time);
1258                 }
1259                 
1260                 if (rrd_test_error())
1261                   break;
1262                 } /* RRA LOOP */
1263
1264             /* break out of the argument parsing loop if error_string is set */
1265             if (rrd_test_error()){
1266                    free(step_start);
1267                    break;
1268             } 
1269             
1270         } /* endif a pdp_st has occurred */ 
1271         rrd.live_head->last_up = current_time;
1272         rrd.live_head->last_up_usec = current_time_usec; 
1273         free(step_start);
1274     } /* function argument loop */
1275
1276     if (seasonal_coef != NULL) free(seasonal_coef);
1277     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1278         if (rra_step_cnt != NULL) free(rra_step_cnt);
1279     rpnstack_free(&rpnstack);
1280
1281     /* if we got here and if there is an error and if the file has not been
1282      * written to, then close things up and return. */
1283     if (rrd_test_error()) {
1284         free(updvals);
1285         free(tmpl_idx);
1286         rrd_free(&rrd);
1287         free(pdp_temp);
1288         free(pdp_new);
1289         fclose(rrd_file);
1290         return(-1);
1291     }
1292
1293     /* aargh ... that was tough ... so many loops ... anyway, its done.
1294      * we just need to write back the live header portion now*/
1295
1296     if (fseek(rrd_file, (sizeof(stat_head_t)
1297                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1298                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1299               SEEK_SET) != 0) {
1300         rrd_set_error("seek rrd for live header writeback");
1301         free(updvals);
1302         free(tmpl_idx);
1303         rrd_free(&rrd);
1304         free(pdp_temp);
1305         free(pdp_new);
1306         fclose(rrd_file);
1307         return(-1);
1308     }
1309
1310     if(version >= 3) {
1311             if(fwrite( rrd.live_head,
1312                        sizeof(live_head_t), 1, rrd_file) != 1){
1313                 rrd_set_error("fwrite live_head to rrd");
1314                 free(updvals);
1315                 rrd_free(&rrd);
1316                 free(tmpl_idx);
1317                 free(pdp_temp);
1318                 free(pdp_new);
1319                 fclose(rrd_file);
1320                 return(-1);
1321             }
1322     }
1323     else {
1324             if(fwrite( &rrd.live_head->last_up,
1325                        sizeof(time_t), 1, rrd_file) != 1){
1326                 rrd_set_error("fwrite live_head to rrd");
1327                 free(updvals);
1328                 rrd_free(&rrd);
1329                 free(tmpl_idx);
1330                 free(pdp_temp);
1331                 free(pdp_new);
1332                 fclose(rrd_file);
1333                 return(-1);
1334             }
1335     }
1336             
1337
1338     if(fwrite( rrd.pdp_prep,
1339                sizeof(pdp_prep_t),
1340                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1341         rrd_set_error("ftwrite pdp_prep to rrd");
1342         free(updvals);
1343         rrd_free(&rrd);
1344         free(tmpl_idx);
1345         free(pdp_temp);
1346         free(pdp_new);
1347         fclose(rrd_file);
1348         return(-1);
1349     }
1350
1351     if(fwrite( rrd.cdp_prep,
1352                sizeof(cdp_prep_t),
1353                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1354        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1355
1356         rrd_set_error("ftwrite cdp_prep to rrd");
1357         free(updvals);
1358         free(tmpl_idx);
1359         rrd_free(&rrd);
1360         free(pdp_temp);
1361         free(pdp_new);
1362         fclose(rrd_file);
1363         return(-1);
1364     }
1365
1366     if(fwrite( rrd.rra_ptr,
1367                sizeof(rra_ptr_t), 
1368                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1369         rrd_set_error("fwrite rra_ptr to rrd");
1370         free(updvals);
1371         free(tmpl_idx);
1372         rrd_free(&rrd);
1373         free(pdp_temp);
1374         free(pdp_new);
1375         fclose(rrd_file);
1376         return(-1);
1377     }
1378
1379     /* OK now close the files and free the memory */
1380     if(fclose(rrd_file) != 0){
1381         rrd_set_error("closing rrd");
1382         free(updvals);
1383         free(tmpl_idx);
1384         rrd_free(&rrd);
1385         free(pdp_temp);
1386         free(pdp_new);
1387         return(-1);
1388     }
1389
1390     /* calling the smoothing code here guarantees at most
1391          * one smoothing operation per rrd_update call. Unfortunately,
1392          * it is possible with bulk updates, or a long-delayed update
1393          * for smoothing to occur off-schedule. This really isn't
1394          * critical except during the burning cycles. */
1395         if (schedule_smooth)
1396         {
1397 #ifndef WIN32
1398           rrd_file = fopen(filename,"r+");
1399 #else
1400           rrd_file = fopen(filename,"rb+");
1401 #endif
1402           rra_start = rra_begin;
1403           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1404           {
1405             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1406                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1407             {
1408 #ifdef DEBUG
1409               fprintf(stderr,"Running smoother for rra %ld\n",i);
1410 #endif
1411               apply_smoother(&rrd,i,rra_start,rrd_file);
1412               if (rrd_test_error())
1413                 break;
1414             }
1415             rra_start += rrd.rra_def[i].row_cnt
1416               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1417           }
1418           fclose(rrd_file);
1419         }
1420     rrd_free(&rrd);
1421     free(updvals);
1422     free(tmpl_idx);
1423     free(pdp_new);
1424     free(pdp_temp);
1425     return(0);
1426 }
1427
1428 /*
1429  * get exclusive lock to whole file.
1430  * lock gets removed when we close the file
1431  *
1432  * returns 0 on success
1433  */
1434 int
1435 LockRRD(FILE *rrdfile)
1436 {
1437     int rrd_fd;         /* File descriptor for RRD */
1438     int                 stat;
1439
1440     rrd_fd = fileno(rrdfile);
1441
1442         {
1443 #ifndef WIN32    
1444                 struct flock    lock;
1445     lock.l_type = F_WRLCK;    /* exclusive write lock */
1446     lock.l_len = 0;           /* whole file */
1447     lock.l_start = 0;         /* start of file */
1448     lock.l_whence = SEEK_SET;   /* end of file */
1449
1450     stat = fcntl(rrd_fd, F_SETLK, &lock);
1451 #else
1452                 struct _stat st;
1453
1454                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1455                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1456                 } else {
1457                         stat = -1;
1458                 }
1459 #endif
1460         }
1461
1462     return(stat);
1463 }
1464
1465
1466 info_t
1467 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1468                unsigned short CDP_scratch_idx, FILE *rrd_file,
1469                    info_t *pcdp_summary, time_t *rra_time)
1470 {
1471    unsigned long ds_idx, cdp_idx;
1472    infoval iv;
1473   
1474    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1475    {
1476       /* compute the cdp index */
1477       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1478 #ifdef DEBUG
1479           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1480              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1481              rrd -> rra_def[rra_idx].cf_nam);
1482 #endif 
1483       if (pcdp_summary != NULL)
1484           {
1485              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1486              /* append info to the return hash */
1487                  pcdp_summary = info_push(pcdp_summary,
1488                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1489                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1490                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1491          RD_I_VAL, iv);
1492           }
1493           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1494                  sizeof(rrd_value_t),1,rrd_file) != 1)
1495           { 
1496              rrd_set_error("writing rrd");
1497              return 0;
1498           }
1499           *rra_current += sizeof(rrd_value_t);
1500         }
1501         return (pcdp_summary);
1502 }