Change updatev RRA return from index_number to cf_nam, pdp_cnt.
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.1.x  Copyright Tobias Oetiker, 1997 - 2002
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.10  2003/04/29 19:14:12  jake
9  * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
10  * Also revert accidental addition of -I to aclocal MakeMakefile.
11  *
12  * Revision 1.9  2003/04/25 18:35:08  jake
13  * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
14  *
15  * Revision 1.8  2003/03/31 21:22:12  oetiker
16  * enables RRDtool updates with microsecond or in case of windows millisecond
17  * precision. This is needed to reduce time measurement error when archive step
18  * is small. (<30s) --  Sasha Mikheev <sasha@avalon-net.co.il>
19  *
20  * Revision 1.7  2003/02/13 07:05:27  oetiker
21  * Find attached the patch I promised to send to you. Please note that there
22  * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
23  * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
24  * library is identical to librrd, but it contains support code for per-thread
25  * global variables currently used for error information only. This is similar
26  * to how errno per-thread variables are implemented.  librrd_th must be linked
27  * alongside of libpthred
28  *
29  * There is also a new file "THREADS", holding some documentation.
30  *
31  * -- Peter Stamfest <peter@stamfest.at>
32  *
33  * Revision 1.6  2002/02/01 20:34:49  oetiker
34  * fixed version number and date/time
35  *
36  * Revision 1.5  2001/05/09 05:31:01  oetiker
37  * Bug fix: when update of multiple PDP/CDP RRAs coincided
38  * with interpolation of multiple PDPs an incorrect value was
39  * stored as the CDP. Especially evident for GAUGE data sources.
40  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
41  *
42  * Revision 1.4  2001/03/10 23:54:41  oetiker
43  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
44  * parser and calculator from rrd_graph and puts then in a new file,
45  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
46  * clean-up of aberrant behavior stuff, including a bug fix.
47  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
48  * -- Jake Brutlag <jakeb@corp.webtv.net>
49  *
50  * Revision 1.3  2001/03/04 13:01:55  oetiker
51  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
52  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
53  * This is backwards compatible! But new files using the Aberrant stuff are not readable
54  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
55  * -- Jake Brutlag <jakeb@corp.webtv.net>
56  *
57  * Revision 1.2  2001/03/04 11:14:25  oetiker
58  * added at-style-time@value:value syntax to rrd_update
59  * --  Dave Bodenstab <imdave@mcs.net>
60  *
61  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
62  * checkin
63  *
64  *****************************************************************************/
65
66 #include "rrd_tool.h"
67 #include <sys/types.h>
68 #include <fcntl.h>
69
70 #ifdef WIN32
71  #include <sys/locking.h>
72  #include <sys/stat.h>
73  #include <io.h>
74 #endif
75
76 #include "rrd_hw.h"
77 #include "rrd_rpncalc.h"
78
79 #include "rrd_is_thread_safe.h"
80
81 #ifdef WIN32
82 /*
83  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
84  * replacement.
85  */
86 #include <sys/timeb.h>
87
88 struct timeval {
89         time_t tv_sec; /* seconds */
90         long tv_usec;  /* microseconds */
91 };
92
93 struct __timezone {
94         int  tz_minuteswest; /* minutes W of Greenwich */
95         int  tz_dsttime;     /* type of dst correction */
96 };
97
98 static gettimeofday(struct timeval *t, struct __timezone *tz) {
99         
100         struct timeb current_time;
101
102         _ftime(&current_time);
103         
104         t->tv_sec  = current_time.time;
105         t->tv_usec = current_time.millitm * 1000;
106 }
107
108 #endif
109 /*
110  * normilize time as returned by gettimeofday. usec part must
111  * be always >= 0
112  */
113 static void normalize_time(struct timeval *t)
114 {
115         if(t->tv_usec < 0) {
116                 t->tv_sec--;
117                 t->tv_usec += 1000000L;
118         }
119 }
120
121 /* Local prototypes */
122 int LockRRD(FILE *rrd_file);
123 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
124                                         unsigned long *rra_current,
125                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
126                                         info_t *pcdp_summary, time_t *rra_time);
127 int rrd_update_r(char *filename, char *template, int argc, char **argv);
128 int _rrd_update(char *filename, char *template, int argc, char **argv, 
129                                         info_t*);
130
131 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
132
133
134 #ifdef STANDALONE
135 int 
136 main(int argc, char **argv){
137         rrd_update(argc,argv);
138         if (rrd_test_error()) {
139                 printf("RRDtool 1.1.x  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
140                         "Usage: rrdupdate filename\n"
141                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
142                         "\t\t\ttime|N:value[:value...]\n\n"
143                         "\t\t\tat-time@value[:value...]\n\n"
144                         "\t\t\t[ time:value[:value...] ..]\n\n");
145                                    
146                 printf("ERROR: %s\n",rrd_get_error());
147                 rrd_clear_error();                                                            
148                 return 1;
149         }
150         return 0;
151 }
152 #endif
153
154 info_t *rrd_update_v(int argc, char **argv)
155 {
156     char             *template = NULL;          
157         info_t *result = NULL;
158         infoval rc;
159
160     while (1) {
161                 static struct option long_options[] =
162                         {
163                                 {"template",      required_argument, 0, 't'},
164                                 {0,0,0,0}
165                         };
166                 int option_index = 0;
167                 int opt;
168                 opt = getopt_long(argc, argv, "t:", 
169                                                   long_options, &option_index);
170                 
171                 if (opt == EOF)
172                         break;
173                 
174                 switch(opt) {
175                 case 't':
176                         template = optarg;
177                         break;
178                 
179                 case '?':
180                         rrd_set_error("unknown option '%s'",argv[optind-1]);
181             rc.u_int = -1;
182                         goto end_tag;
183                 }
184     }
185
186     /* need at least 2 arguments: filename, data. */
187     if (argc-optind < 2) {
188                 rrd_set_error("Not enough arguments");
189         rc.u_int = -1;
190                 goto end_tag;
191     }
192     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
193         rc.u_int = _rrd_update(argv[optind], template,
194                       argc - optind - 1, argv + optind + 1, result);
195     result->value.u_int = rc.u_int;
196 end_tag:
197     return result;
198 }
199
200 int
201 rrd_update(int argc, char **argv)
202 {
203     char             *template = NULL;          
204     int rc;
205
206     while (1) {
207                 static struct option long_options[] =
208                         {
209                                 {"template",      required_argument, 0, 't'},
210                                 {0,0,0,0}
211                         };
212                 int option_index = 0;
213                 int opt;
214                 opt = getopt_long(argc, argv, "t:", 
215                                                   long_options, &option_index);
216                 
217                 if (opt == EOF)
218                         break;
219                 
220                 switch(opt) {
221                 case 't':
222                         template = optarg;
223                         break;
224                 
225                 case '?':
226                         rrd_set_error("unknown option '%s'",argv[optind-1]);
227                         return(-1);
228                 }
229     }
230
231     /* need at least 2 arguments: filename, data. */
232     if (argc-optind < 2) {
233                 rrd_set_error("Not enough arguments");
234
235                 return -1;
236     }
237  
238         rc = rrd_update_r(argv[optind], template,
239                       argc - optind - 1, argv + optind + 1);
240     return rc;
241 }
242
243 int
244 rrd_update_r(char *filename, char *template, int argc, char **argv)
245 {
246    return _rrd_update(filename, template, argc, argv, NULL);
247 }
248
249 int
250 _rrd_update(char *filename, char *template, int argc, char **argv, 
251    info_t *pcdp_summary)
252 {
253
254     int              arg_i = 2;
255     short            j;
256     unsigned long    i,ii,iii=1;
257
258     unsigned long    rra_begin;          /* byte pointer to the rra
259                                           * area in the rrd file.  this
260                                           * pointer never changes value */
261     unsigned long    rra_start;          /* byte pointer to the rra
262                                           * area in the rrd file.  this
263                                           * pointer changes as each rrd is
264                                           * processed. */
265     unsigned long    rra_current;        /* byte pointer to the current write
266                                           * spot in the rrd file. */
267     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
268     double           interval,
269         pre_int,post_int;                /* interval between this and
270                                           * the last run */
271     unsigned long    proc_pdp_st;        /* which pdp_st was the last
272                                           * to be processed */
273     unsigned long    occu_pdp_st;        /* when was the pdp_st
274                                           * before the last update
275                                           * time */
276     unsigned long    proc_pdp_age;       /* how old was the data in
277                                           * the pdp prep area when it
278                                           * was last updated */
279     unsigned long    occu_pdp_age;       /* how long ago was the last
280                                           * pdp_step time */
281     rrd_value_t      *pdp_new;           /* prepare the incoming data
282                                           * to be added the the
283                                           * existing entry */
284     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
285                                           * to be added the the
286                                           * cdp values */
287
288     long             *tmpl_idx;          /* index representing the settings
289                                             transported by the template index */
290     unsigned long    tmpl_cnt = 2;       /* time and data */
291
292     FILE             *rrd_file;
293     rrd_t            rrd;
294     time_t           current_time;
295         time_t           rra_time; /* time of update for a RRA */
296     unsigned long    current_time_usec;  /* microseconds part of current time */
297     struct timeval   tmp_time;           /* used for time conversion */
298
299     char             **updvals;
300     int              schedule_smooth = 0;
301         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
302                                          /* a vector of future Holt-Winters seasonal coefs */
303     unsigned long    elapsed_pdp_st;
304                                          /* number of elapsed PDP steps since last update */
305     unsigned long    *rra_step_cnt = NULL;
306                                          /* number of rows to be updated in an RRA for a data
307                                           * value. */
308     unsigned long    start_pdp_offset;
309                                          /* number of PDP steps since the last update that
310                                           * are assigned to the first CDP to be generated
311                                           * since the last update. */
312     unsigned short   scratch_idx;
313                                          /* index into the CDP scratch array */
314     enum cf_en       current_cf;
315                                          /* numeric id of the current consolidation function */
316     rpnstack_t       rpnstack; /* used for COMPUTE DS */
317     int              version;  /* rrd version */
318
319     rpnstack_init(&rpnstack);
320
321     /* need at least 1 arguments: data. */
322     if (argc < 1) {
323         rrd_set_error("Not enough arguments");
324         return -1;
325     }
326     
327     
328
329     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
330         return -1;
331     }
332     /* initialize time */
333     version = atoi(rrd.stat_head->version);
334     gettimeofday(&tmp_time, 0);
335     normalize_time(&tmp_time);
336     current_time = tmp_time.tv_sec;
337     if(version >= 3) {
338         current_time_usec = tmp_time.tv_usec;
339     }
340     else {
341         current_time_usec = 0;
342     }
343
344     rra_current = rra_start = rra_begin = ftell(rrd_file);
345     /* This is defined in the ANSI C standard, section 7.9.5.3:
346
347         When a file is opened with udpate mode ('+' as the second
348         or third character in the ... list of mode argument
349         variables), both input and ouptut may be performed on the
350         associated stream.  However, ...  input may not be directly
351         followed by output without an intervening call to a file
352         positioning function, unless the input oepration encounters
353         end-of-file. */
354     fseek(rrd_file, 0, SEEK_CUR);
355
356     
357     /* get exclusive lock to whole file.
358      * lock gets removed when we close the file.
359      */
360     if (LockRRD(rrd_file) != 0) {
361       rrd_set_error("could not lock RRD");
362       rrd_free(&rrd);
363       fclose(rrd_file);
364       return(-1);   
365     } 
366
367     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
368         rrd_set_error("allocating updvals pointer array");
369         rrd_free(&rrd);
370         fclose(rrd_file);
371         return(-1);
372     }
373
374     if ((pdp_temp = malloc(sizeof(rrd_value_t)
375                            *rrd.stat_head->ds_cnt))==NULL){
376         rrd_set_error("allocating pdp_temp ...");
377         free(updvals);
378         rrd_free(&rrd);
379         fclose(rrd_file);
380         return(-1);
381     }
382
383     if ((tmpl_idx = malloc(sizeof(unsigned long)
384                            *(rrd.stat_head->ds_cnt+1)))==NULL){
385         rrd_set_error("allocating tmpl_idx ...");
386         free(pdp_temp);
387         free(updvals);
388         rrd_free(&rrd);
389         fclose(rrd_file);
390         return(-1);
391     }
392     /* initialize template redirector */
393     /* default config example (assume DS 1 is a CDEF DS)
394        tmpl_idx[0] -> 0; (time)
395        tmpl_idx[1] -> 1; (DS 0)
396        tmpl_idx[2] -> 3; (DS 2)
397        tmpl_idx[3] -> 4; (DS 3) */
398     tmpl_idx[0] = 0; /* time */
399     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
400         {
401            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
402               tmpl_idx[ii++]=i;
403         }
404     tmpl_cnt= ii;
405
406     if (template) {
407         char *dsname;
408         unsigned int tmpl_len;
409         dsname = template;
410         tmpl_cnt = 1; /* the first entry is the time */
411         tmpl_len = strlen(template);
412         for(i=0;i<=tmpl_len ;i++) {
413             if (template[i] == ':' || template[i] == '\0') {
414                 template[i] = '\0';
415                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
416                     rrd_set_error("Template contains more DS definitions than RRD");
417                     free(updvals); free(pdp_temp);
418                     free(tmpl_idx); rrd_free(&rrd);
419                     fclose(rrd_file); return(-1);
420                 }
421                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
422                     rrd_set_error("unknown DS name '%s'",dsname);
423                     free(updvals); free(pdp_temp);
424                     free(tmpl_idx); rrd_free(&rrd);
425                     fclose(rrd_file); return(-1);
426                 } else {
427                   /* the first element is always the time */
428                   tmpl_idx[tmpl_cnt-1]++; 
429                   /* go to the next entry on the template */
430                   dsname = &template[i+1];
431                   /* fix the damage we did before */
432                   if (i<tmpl_len) {
433                      template[i]=':';
434                   } 
435
436                 }
437             }       
438         }
439     }
440     if ((pdp_new = malloc(sizeof(rrd_value_t)
441                           *rrd.stat_head->ds_cnt))==NULL){
442         rrd_set_error("allocating pdp_new ...");
443         free(updvals);
444         free(pdp_temp);
445         free(tmpl_idx);
446         rrd_free(&rrd);
447         fclose(rrd_file);
448         return(-1);
449     }
450
451     /* loop through the arguments. */
452     for(arg_i=0; arg_i<argc;arg_i++) {
453         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
454         char *step_start = stepper;
455         char *p;
456         char *parsetime_error = NULL;
457         enum {atstyle, normal} timesyntax;
458         struct time_value ds_tv;
459         if (stepper == NULL){
460                 rrd_set_error("failed duplication argv entry");
461                 free(updvals);
462                 free(pdp_temp);  
463                 free(tmpl_idx);
464                 rrd_free(&rrd);
465                 fclose(rrd_file);
466                 return(-1);
467          }
468         /* initialize all ds input to unknown except the first one
469            which has always got to be set */
470         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
471         strcpy(stepper,argv[arg_i]);
472         updvals[0]=stepper;
473         /* separate all ds elements; first must be examined separately
474            due to alternate time syntax */
475         if ((p=strchr(stepper,'@'))!=NULL) {
476             timesyntax = atstyle;
477             *p = '\0';
478             stepper = p+1;
479         } else if ((p=strchr(stepper,':'))!=NULL) {
480             timesyntax = normal;
481             *p = '\0';
482             stepper = p+1;
483         } else {
484             rrd_set_error("expected timestamp not found in data source from %s:...",
485                           argv[arg_i]);
486             free(step_start);
487             break;
488         }
489         ii=1;
490         updvals[tmpl_idx[ii]] = stepper;
491         while (*stepper) {
492             if (*stepper == ':') {
493                 *stepper = '\0';
494                 ii++;
495                 if (ii<tmpl_cnt){                   
496                     updvals[tmpl_idx[ii]] = stepper+1;
497                 }
498             }
499             stepper++;
500         }
501
502         if (ii != tmpl_cnt-1) {
503             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
504                           tmpl_cnt-1, ii, argv[arg_i]);
505             free(step_start);
506             break;
507         }
508         
509         /* get the time from the reading ... handle N */
510         if (timesyntax == atstyle) {
511             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
512                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
513                 free(step_start);
514                 break;
515             }
516             if (ds_tv.type == RELATIVE_TO_END_TIME ||
517                 ds_tv.type == RELATIVE_TO_START_TIME) {
518                 rrd_set_error("specifying time relative to the 'start' "
519                               "or 'end' makes no sense here: %s",
520                               updvals[0]);
521                 free(step_start);
522                 break;
523             }
524
525             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
526             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
527             
528         } else if (strcmp(updvals[0],"N")==0){
529             gettimeofday(&tmp_time, 0);
530             normalize_time(&tmp_time);
531             current_time = tmp_time.tv_sec;
532             current_time_usec = tmp_time.tv_usec;
533         } else {
534             double tmp;
535             tmp = strtod(updvals[0], 0);
536             current_time = floor(tmp);
537             current_time_usec = (long)((tmp - current_time) * 1000000L);
538         }
539         /* dont do any correction for old version RRDs */
540         if(version < 3) 
541             current_time_usec = 0;
542         
543         if(current_time <= rrd.live_head->last_up){
544             rrd_set_error("illegal attempt to update using time %ld when "
545                           "last update time is %ld (minimum one second step)",
546                           current_time, rrd.live_head->last_up);
547             free(step_start);
548             break;
549         }
550         
551         
552         /* seek to the beginning of the rra's */
553         if (rra_current != rra_begin) {
554             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
555                 rrd_set_error("seek error in rrd");
556                 free(step_start);
557                 break;
558             }
559             rra_current = rra_begin;
560         }
561         rra_start = rra_begin;
562
563         /* when was the current pdp started */
564         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
565         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
566
567         /* when did the last pdp_st occur */
568         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
569         occu_pdp_st = current_time - occu_pdp_age;
570         /* interval = current_time - rrd.live_head->last_up; */
571         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
572     
573         if (occu_pdp_st > proc_pdp_st){
574             /* OK we passed the pdp_st moment*/
575             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
576                                                               * occurred before the latest
577                                                               * pdp_st moment*/
578             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
579             post_int = occu_pdp_age;                         /* how much after it */
580             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
581         } else {
582             pre_int = interval;
583             post_int = 0;
584         }
585
586 #ifdef DEBUG
587         printf(
588                "proc_pdp_age %lu\t"
589                "proc_pdp_st %lu\t" 
590                "occu_pfp_age %lu\t" 
591                "occu_pdp_st %lu\t"
592                "int %lf\t"
593                "pre_int %lf\t"
594                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
595                 occu_pdp_age, occu_pdp_st,
596                interval, pre_int, post_int);
597 #endif
598     
599         /* process the data sources and update the pdp_prep 
600          * area accordingly */
601         for(i=0;i<rrd.stat_head->ds_cnt;i++){
602             enum dst_en dst_idx;
603             dst_idx= dst_conv(rrd.ds_def[i].dst);
604                 /* NOTE: DST_CDEF should never enter this if block, because
605                  * updvals[i+1][0] is initialized to 'U'; unless the caller
606                  * accidently specified a value for the DST_CDEF. To handle 
607                  * this case, an extra check is required. */
608             if((updvals[i+1][0] != 'U') &&
609                    (dst_idx != DST_CDEF) &&
610                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
611                double rate = DNAN;
612                /* the data source type defines how to process the data */
613                 /* pdp_new contains rate * time ... eg the bytes
614                  * transferred during the interval. Doing it this way saves
615                  * a lot of math operations */
616                 
617
618                 switch(dst_idx){
619                 case DST_COUNTER:
620                 case DST_DERIVE:
621                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
622                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
623                        if(dst_idx == DST_COUNTER) {
624                           /* simple overflow catcher sugestet by andres kroonmaa */
625                           /* this will fail terribly for non 32 or 64 bit counters ... */
626                           /* are there any others in SNMP land ? */
627                           if (pdp_new[i] < (double)0.0 ) 
628                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
629                           if (pdp_new[i] < (double)0.0 ) 
630                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
631                        }
632                        rate = pdp_new[i] / interval;
633                     }
634                    else {
635                      pdp_new[i]= DNAN;          
636                    }
637                    break;
638                 case DST_ABSOLUTE:
639                     pdp_new[i]= atof(updvals[i+1]);
640                     rate = pdp_new[i] / interval;                 
641                     break;
642                 case DST_GAUGE:
643                     pdp_new[i] = atof(updvals[i+1]) * interval;
644                     rate = pdp_new[i] / interval;                  
645                     break;
646                 default:
647                     rrd_set_error("rrd contains unknown DS type : '%s'",
648                                   rrd.ds_def[i].dst);
649                     break;
650                 }
651                 /* break out of this for loop if the error string is set */
652                 if (rrd_test_error()){
653                     break;
654                 }
655                /* make sure pdp_temp is neither too large or too small
656                 * if any of these occur it becomes unknown ...
657                 * sorry folks ... */
658                if ( ! isnan(rate) && 
659                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
660                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
661                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
662                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
663                   pdp_new[i] = DNAN;
664                }               
665             } else {
666                 /* no news is news all the same */
667                 pdp_new[i] = DNAN;
668             }
669             
670             /* make a copy of the command line argument for the next run */
671 #ifdef DEBUG
672             fprintf(stderr,
673                     "prep ds[%lu]\t"
674                     "last_arg '%s'\t"
675                     "this_arg '%s'\t"
676                     "pdp_new %10.2f\n",
677                     i,
678                     rrd.pdp_prep[i].last_ds,
679                     updvals[i+1], pdp_new[i]);
680 #endif
681             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
682                 strncpy(rrd.pdp_prep[i].last_ds,
683                         updvals[i+1],LAST_DS_LEN-1);
684                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
685             }
686         }
687         /* break out of the argument parsing loop if the error_string is set */
688         if (rrd_test_error()){
689             free(step_start);
690             break;
691         }
692         /* has a pdp_st moment occurred since the last run ? */
693
694         if (proc_pdp_st == occu_pdp_st){
695             /* no we have not passed a pdp_st moment. therefore update is simple */
696
697             for(i=0;i<rrd.stat_head->ds_cnt;i++){
698                 if(isnan(pdp_new[i]))
699                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
700                 else
701                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
702 #ifdef DEBUG
703                 fprintf(stderr,
704                         "NO PDP  ds[%lu]\t"
705                         "value %10.2f\t"
706                         "unkn_sec %5lu\n",
707                         i,
708                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
709                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
710 #endif
711             }   
712         } else {
713             /* an pdp_st has occurred. */
714
715             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
716              * occurred up to the last run.        
717             pdp_new[] contains rate*seconds from the latest run.
718             pdp_temp[] will contain the rate for cdp */
719
720             for(i=0;i<rrd.stat_head->ds_cnt;i++){
721                 /* update pdp_prep to the current pdp_st */
722                 if(isnan(pdp_new[i]))
723                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
724                 else
725                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
726                         pdp_new[i]/(double)interval*(double)pre_int;
727
728                 /* if too much of the pdp_prep is unknown we dump it */
729                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
730                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
731                     (occu_pdp_st-proc_pdp_st <= 
732                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
733                     pdp_temp[i] = DNAN;
734                 } else {
735                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
736                         / (double)( occu_pdp_st
737                                    - proc_pdp_st
738                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
739                 }
740
741                 /* process CDEF data sources; remember each CDEF DS can
742                  * only reference other DS with a lower index number */
743             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
744                    rpnp_t *rpnp;
745                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
746                    /* substitue data values for OP_VARIABLE nodes */
747                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
748                    {
749                           if (rpnp[ii].op == OP_VARIABLE) {
750                                  rpnp[ii].op = OP_NUMBER;
751                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
752                           }
753                    }
754                    /* run the rpn calculator */
755                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
756                           free(rpnp);
757                           break; /* exits the data sources pdp_temp loop */
758                    }
759                 }
760         
761                 /* make pdp_prep ready for the next run */
762                 if(isnan(pdp_new[i])){
763                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
764                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
765                 } else {
766                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
767                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
768                         pdp_new[i]/(double)interval*(double)post_int;
769                 }
770
771 #ifdef DEBUG
772                 fprintf(stderr,
773                         "PDP UPD ds[%lu]\t"
774                         "pdp_temp %10.2f\t"
775                         "new_prep %10.2f\t"
776                         "new_unkn_sec %5lu\n",
777                         i, pdp_temp[i],
778                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
779                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
780 #endif
781             }
782
783                 /* if there were errors during the last loop, bail out here */
784             if (rrd_test_error()){
785                free(step_start);
786                break;
787             }
788
789                 /* compute the number of elapsed pdp_st moments */
790                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
791 #ifdef DEBUG
792                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
793 #endif
794                 if (rra_step_cnt == NULL)
795                 {
796                    rra_step_cnt = (unsigned long *) 
797                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
798                 }
799
800             for(i = 0, rra_start = rra_begin;
801                 i < rrd.stat_head->rra_cnt;
802             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
803                 i++)
804                 {
805                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
806                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
807                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
808         if (start_pdp_offset <= elapsed_pdp_st) {
809            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
810                       rrd.rra_def[i].pdp_cnt + 1;
811             } else {
812                    rra_step_cnt[i] = 0;
813                 }
814
815                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
816                 {
817                    /* If this is a bulk update, we need to skip ahead in the seasonal
818                         * arrays so that they will be correct for the next observed value;
819                         * note that for the bulk update itself, no update will occur to
820                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
821                         * be set to DNAN. */
822            if (rra_step_cnt[i] > 2) 
823                    {
824                           /* skip update by resetting rra_step_cnt[i],
825                            * note that this is not data source specific; this is due
826                            * to the bulk update, not a DNAN value for the specific data
827                            * source. */
828                           rra_step_cnt[i] = 0;
829               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
830                              &last_seasonal_coef);
831                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
832                              &seasonal_coef);
833                    }
834                 
835                   /* periodically run a smoother for seasonal effects */
836                   /* Need to use first cdp parameter buffer to track
837                    * burnin (burnin requires a specific smoothing schedule).
838                    * The CDP_init_seasonal parameter is really an RRA level,
839                    * not a data source within RRA level parameter, but the rra_def
840                    * is read only for rrd_update (not flushed to disk). */
841                   iii = i*(rrd.stat_head -> ds_cnt);
842                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
843                           <= BURNIN_CYCLES)
844                   {
845                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
846                                  > rrd.rra_def[i].row_cnt - 1) {
847                            /* mark off one of the burnin cycles */
848                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
849                        schedule_smooth = 1;
850                          }  
851                   } else {
852                          /* someone has no doubt invented a trick to deal with this
853                           * wrap around, but at least this code is clear. */
854                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
855                              rrd.rra_ptr[i].cur_row)
856                          {
857                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
858                                   * mapping between PDP and CDP */
859                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
860                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
861                                  {
862 #ifdef DEBUG
863                                         fprintf(stderr,
864                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
865                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
866                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
867 #endif
868                                         schedule_smooth = 1;
869                                  }
870              } else {
871                                  /* can't rely on negative numbers because we are working with
872                                   * unsigned values */
873                                  /* Don't need modulus here. If we've wrapped more than once, only
874                                   * one smooth is executed at the end. */
875                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
876                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
877                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
878                                  {
879 #ifdef DEBUG
880                                         fprintf(stderr,
881                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
882                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
883                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
884 #endif
885                                         schedule_smooth = 1;
886                                  }
887                          }
888                   }
889
890               rra_current = ftell(rrd_file); 
891                 } /* if cf is DEVSEASONAL or SEASONAL */
892
893         if (rrd_test_error()) break;
894
895                     /* update CDP_PREP areas */
896                     /* loop over data soures within each RRA */
897                     for(ii = 0;
898                         ii < rrd.stat_head->ds_cnt;
899                         ii++)
900                         {
901                         
902                         /* iii indexes the CDP prep area for this data source within the RRA */
903                         iii=i*rrd.stat_head->ds_cnt+ii;
904
905                         if (rrd.rra_def[i].pdp_cnt > 1) {
906                           
907                            if (rra_step_cnt[i] > 0) {
908                            /* If we are in this block, as least 1 CDP value will be written to
909                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
910                                 * to be written, then the "fill in" value is the CDP_secondary_val
911                                 * entry. */
912                                   if (isnan(pdp_temp[ii]))
913                   {
914                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
915                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
916                                   } else {
917                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
918                                           * CDP data entries. No matter the CF, the value is the same because
919                                           * the average, max, min, and last of a list of identical values is
920                                           * the same, namely, the value itself. */
921                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
922                                   }
923                      
924                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
925                                       > rrd.rra_def[i].pdp_cnt*
926                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
927                                   {
928                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
929                                          /* initialize carry over */
930                                          if (current_cf == CF_AVERAGE) {
931                                                    if (isnan(pdp_temp[ii])) { 
932                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
933                                                    } else {
934                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
935                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
936                                                    }
937                                          } else {
938                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
939                                          }
940                                   } else {
941                                          rrd_value_t cum_val, cur_val; 
942                                      switch (current_cf) {
943                                                 case CF_AVERAGE:
944                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
945                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
946                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
947                                                (cum_val + cur_val * start_pdp_offset) /
948                                            (rrd.rra_def[i].pdp_cnt
949                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
950                                                    /* initialize carry over value */
951                                                    if (isnan(pdp_temp[ii])) { 
952                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
953                                                    } else {
954                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
955                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
956                                                    }
957                                                    break;
958                                                 case CF_MAXIMUM:
959                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
960                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
961 #ifdef DEBUG
962                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
963                                                           isnan(pdp_temp[ii])) {
964                                                      fprintf(stderr,
965                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
966                                                                 i,ii);
967                                                          exit(-1);
968                                                   }
969 #endif
970                                                   if (cur_val > cum_val)
971                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
972                                                   else
973                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
974                                                   /* initialize carry over value */
975                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
976                                                   break;
977                                                 case CF_MINIMUM:
978                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
979                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
980 #ifdef DEBUG
981                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
982                                                           isnan(pdp_temp[ii])) {
983                                                      fprintf(stderr,
984                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
985                                                                 i,ii);
986                                                          exit(-1);
987                                                   }
988 #endif
989                                                   if (cur_val < cum_val)
990                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
991                                                   else
992                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
993                                                   /* initialize carry over value */
994                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
995                                                   break;
996                                                 case CF_LAST:
997                                                 default:
998                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
999                                                    /* initialize carry over value */
1000                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1001                                                 break;
1002                                          }
1003                                   } /* endif meets xff value requirement for a valid value */
1004                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1005                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1006                                   if (isnan(pdp_temp[ii]))
1007                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1008                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1009                                   else
1010                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1011                } else  /* rra_step_cnt[i]  == 0 */
1012                            {
1013 #ifdef DEBUG
1014                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1015                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1016                                          i,ii);
1017                                   } else {
1018                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1019                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1020                                   }
1021 #endif
1022                                   if (isnan(pdp_temp[ii])) {
1023                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1024                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1025                                   {
1026                                          if (current_cf == CF_AVERAGE) {
1027                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1028                                                    elapsed_pdp_st;
1029                                          } else {
1030                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1031                                          }
1032 #ifdef DEBUG
1033                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1034                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1035 #endif
1036                                   } else {
1037                                          switch (current_cf) {
1038                                          case CF_AVERAGE:
1039                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1040                                                    elapsed_pdp_st;
1041                                                 break;
1042                                          case CF_MINIMUM:
1043                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1044                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1045                                                 break; 
1046                                          case CF_MAXIMUM:
1047                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1048                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1049                                                 break; 
1050                                          case CF_LAST:
1051                                          default:
1052                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1053                                                 break;
1054                                          }
1055                                   }
1056                            }
1057                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1058                            if (elapsed_pdp_st > 2)
1059                            {
1060                                    switch (current_cf) {
1061                                    case CF_AVERAGE:
1062                                    default:
1063                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1064                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1065                                           break;
1066                    case CF_SEASONAL:
1067                                    case CF_DEVSEASONAL:
1068                                           /* need to update cached seasonal values, so they are consistent
1069                                            * with the bulk update */
1070                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1071                                            * CDP_last_deviation are the same. */
1072                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1073                                                  last_seasonal_coef[ii];
1074                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1075                                                  seasonal_coef[ii];
1076                                           break;
1077                    case CF_HWPREDICT:
1078                                           /* need to update the null_count and last_null_count.
1079                                            * even do this for non-DNAN pdp_temp because the
1080                                            * algorithm is not learning from batch updates. */
1081                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1082                                                  elapsed_pdp_st;
1083                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1084                                                  elapsed_pdp_st - 1;
1085                                           /* fall through */
1086                                    case CF_DEVPREDICT:
1087                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1088                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1089                                           break;
1090                    case CF_FAILURES:
1091                                           /* do not count missed bulk values as failures */
1092                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1093                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1094                                           /* need to reset violations buffer.
1095                                            * could do this more carefully, but for now, just
1096                                            * assume a bulk update wipes away all violations. */
1097                       erase_violations(&rrd, iii, i);
1098                                           break;
1099                                    }
1100                            } 
1101                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1102
1103                         if (rrd_test_error()) break;
1104
1105                         } /* endif data sources loop */
1106         } /* end RRA Loop */
1107
1108                 /* this loop is only entered if elapsed_pdp_st < 3 */
1109                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1110                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1111                 {
1112                for(i = 0, rra_start = rra_begin;
1113                    i < rrd.stat_head->rra_cnt;
1114                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1115                    i++)
1116                    {
1117                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1118
1119                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1120                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1121                           {
1122                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1123                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1124                                 &seasonal_coef);
1125                  rra_current = ftell(rrd_file);
1126                           }
1127                           if (rrd_test_error()) break;
1128                       /* loop over data soures within each RRA */
1129                       for(ii = 0;
1130                           ii < rrd.stat_head->ds_cnt;
1131                           ii++)
1132                           {
1133                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1134                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1135                                     scratch_idx, seasonal_coef);
1136                           }
1137            } /* end RRA Loop */
1138                    if (rrd_test_error()) break;
1139             } /* end elapsed_pdp_st loop */
1140
1141                 if (rrd_test_error()) break;
1142
1143                 /* Ready to write to disk */
1144                 /* Move sequentially through the file, writing one RRA at a time.
1145                  * Note this architecture divorces the computation of CDP with
1146                  * flushing updated RRA entries to disk. */
1147             for(i = 0, rra_start = rra_begin;
1148                 i < rrd.stat_head->rra_cnt;
1149             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1150                 i++) {
1151                 /* is there anything to write for this RRA? If not, continue. */
1152         if (rra_step_cnt[i] == 0) continue;
1153
1154                 /* write the first row */
1155 #ifdef DEBUG
1156         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1157 #endif
1158             rrd.rra_ptr[i].cur_row++;
1159             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1160                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1161                 /* positition on the first row */
1162                 rra_pos_tmp = rra_start +
1163                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1164                 if(rra_pos_tmp != rra_current) {
1165                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1166                       rrd_set_error("seek error in rrd");
1167                       break;
1168                    }
1169                    rra_current = rra_pos_tmp;
1170                 }
1171
1172 #ifdef DEBUG
1173             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1174 #endif
1175                 scratch_idx = CDP_primary_val;
1176                 if (pcdp_summary != NULL)
1177                 {
1178                    rra_time = (current_time - current_time 
1179                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1180                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1181                 }
1182                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1183                    pcdp_summary, &rra_time);
1184                 if (rrd_test_error()) break;
1185
1186                 /* write other rows of the bulk update, if any */
1187                 scratch_idx = CDP_secondary_val;
1188                 for ( ; rra_step_cnt[i] > 1; 
1189                      rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1190                 {
1191                    if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1192                    {
1193 #ifdef DEBUG
1194               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1195                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1196 #endif
1197                           /* wrap */
1198                           rrd.rra_ptr[i].cur_row = 0;
1199                           /* seek back to beginning of current rra */
1200                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1201                           {
1202                          rrd_set_error("seek error in rrd");
1203                          break;
1204                           }
1205 #ifdef DEBUG
1206                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1207 #endif
1208                           rra_current = rra_start;
1209                    }
1210                    if (pcdp_summary != NULL)
1211                    {
1212                       rra_time = (current_time - current_time 
1213                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1214                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1215                    }
1216                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1217                       pcdp_summary, &rra_time);
1218                 }
1219                 
1220                 if (rrd_test_error())
1221                   break;
1222                 } /* RRA LOOP */
1223
1224             /* break out of the argument parsing loop if error_string is set */
1225             if (rrd_test_error()){
1226                    free(step_start);
1227                    break;
1228             } 
1229             
1230         } /* endif a pdp_st has occurred */ 
1231         rrd.live_head->last_up = current_time;
1232         rrd.live_head->last_up_usec = current_time_usec; 
1233         free(step_start);
1234     } /* function argument loop */
1235
1236     if (seasonal_coef != NULL) free(seasonal_coef);
1237     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1238         if (rra_step_cnt != NULL) free(rra_step_cnt);
1239     rpnstack_free(&rpnstack);
1240
1241     /* if we got here and if there is an error and if the file has not been
1242      * written to, then close things up and return. */
1243     if (rrd_test_error()) {
1244         free(updvals);
1245         free(tmpl_idx);
1246         rrd_free(&rrd);
1247         free(pdp_temp);
1248         free(pdp_new);
1249         fclose(rrd_file);
1250         return(-1);
1251     }
1252
1253     /* aargh ... that was tough ... so many loops ... anyway, its done.
1254      * we just need to write back the live header portion now*/
1255
1256     if (fseek(rrd_file, (sizeof(stat_head_t)
1257                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1258                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1259               SEEK_SET) != 0) {
1260         rrd_set_error("seek rrd for live header writeback");
1261         free(updvals);
1262         free(tmpl_idx);
1263         rrd_free(&rrd);
1264         free(pdp_temp);
1265         free(pdp_new);
1266         fclose(rrd_file);
1267         return(-1);
1268     }
1269
1270     if(version >= 3) {
1271             if(fwrite( rrd.live_head,
1272                        sizeof(live_head_t), 1, rrd_file) != 1){
1273                 rrd_set_error("fwrite live_head to rrd");
1274                 free(updvals);
1275                 rrd_free(&rrd);
1276                 free(tmpl_idx);
1277                 free(pdp_temp);
1278                 free(pdp_new);
1279                 fclose(rrd_file);
1280                 return(-1);
1281             }
1282     }
1283     else {
1284             if(fwrite( &rrd.live_head->last_up,
1285                        sizeof(time_t), 1, rrd_file) != 1){
1286                 rrd_set_error("fwrite live_head to rrd");
1287                 free(updvals);
1288                 rrd_free(&rrd);
1289                 free(tmpl_idx);
1290                 free(pdp_temp);
1291                 free(pdp_new);
1292                 fclose(rrd_file);
1293                 return(-1);
1294             }
1295     }
1296             
1297
1298     if(fwrite( rrd.pdp_prep,
1299                sizeof(pdp_prep_t),
1300                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1301         rrd_set_error("ftwrite pdp_prep to rrd");
1302         free(updvals);
1303         rrd_free(&rrd);
1304         free(tmpl_idx);
1305         free(pdp_temp);
1306         free(pdp_new);
1307         fclose(rrd_file);
1308         return(-1);
1309     }
1310
1311     if(fwrite( rrd.cdp_prep,
1312                sizeof(cdp_prep_t),
1313                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1314        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1315
1316         rrd_set_error("ftwrite cdp_prep to rrd");
1317         free(updvals);
1318         free(tmpl_idx);
1319         rrd_free(&rrd);
1320         free(pdp_temp);
1321         free(pdp_new);
1322         fclose(rrd_file);
1323         return(-1);
1324     }
1325
1326     if(fwrite( rrd.rra_ptr,
1327                sizeof(rra_ptr_t), 
1328                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1329         rrd_set_error("fwrite rra_ptr to rrd");
1330         free(updvals);
1331         free(tmpl_idx);
1332         rrd_free(&rrd);
1333         free(pdp_temp);
1334         free(pdp_new);
1335         fclose(rrd_file);
1336         return(-1);
1337     }
1338
1339     /* OK now close the files and free the memory */
1340     if(fclose(rrd_file) != 0){
1341         rrd_set_error("closing rrd");
1342         free(updvals);
1343         free(tmpl_idx);
1344         rrd_free(&rrd);
1345         free(pdp_temp);
1346         free(pdp_new);
1347         return(-1);
1348     }
1349
1350     /* calling the smoothing code here guarantees at most
1351          * one smoothing operation per rrd_update call. Unfortunately,
1352          * it is possible with bulk updates, or a long-delayed update
1353          * for smoothing to occur off-schedule. This really isn't
1354          * critical except during the burning cycles. */
1355         if (schedule_smooth)
1356         {
1357 #ifndef WIN32
1358           rrd_file = fopen(filename,"r+");
1359 #else
1360           rrd_file = fopen(filename,"rb+");
1361 #endif
1362           rra_start = rra_begin;
1363           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1364           {
1365             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1366                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1367             {
1368 #ifdef DEBUG
1369               fprintf(stderr,"Running smoother for rra %ld\n",i);
1370 #endif
1371               apply_smoother(&rrd,i,rra_start,rrd_file);
1372               if (rrd_test_error())
1373                 break;
1374             }
1375             rra_start += rrd.rra_def[i].row_cnt
1376               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1377           }
1378           fclose(rrd_file);
1379         }
1380     rrd_free(&rrd);
1381     free(updvals);
1382     free(tmpl_idx);
1383     free(pdp_new);
1384     free(pdp_temp);
1385     return(0);
1386 }
1387
1388 /*
1389  * get exclusive lock to whole file.
1390  * lock gets removed when we close the file
1391  *
1392  * returns 0 on success
1393  */
1394 int
1395 LockRRD(FILE *rrdfile)
1396 {
1397     int rrd_fd;         /* File descriptor for RRD */
1398     int                 stat;
1399
1400     rrd_fd = fileno(rrdfile);
1401
1402         {
1403 #ifndef WIN32    
1404                 struct flock    lock;
1405     lock.l_type = F_WRLCK;    /* exclusive write lock */
1406     lock.l_len = 0;           /* whole file */
1407     lock.l_start = 0;         /* start of file */
1408     lock.l_whence = SEEK_SET;   /* end of file */
1409
1410     stat = fcntl(rrd_fd, F_SETLK, &lock);
1411 #else
1412                 struct _stat st;
1413
1414                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1415                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1416                 } else {
1417                         stat = -1;
1418                 }
1419 #endif
1420         }
1421
1422     return(stat);
1423 }
1424
1425
1426 info_t
1427 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1428                unsigned short CDP_scratch_idx, FILE *rrd_file,
1429                    info_t *pcdp_summary, time_t *rra_time)
1430 {
1431    unsigned long ds_idx, cdp_idx;
1432    infoval iv;
1433   
1434    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1435    {
1436       /* compute the cdp index */
1437       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1438 #ifdef DEBUG
1439           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1440              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1441              rrd -> rra_def[rra_idx].cf_nam);
1442 #endif 
1443       if (pcdp_summary != NULL)
1444           {
1445              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1446              /* append info to the return hash */
1447                  pcdp_summary = info_push(pcdp_summary,
1448                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1449                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1450                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1451          RD_I_VAL, iv);
1452           }
1453           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1454                  sizeof(rrd_value_t),1,rrd_file) != 1)
1455           { 
1456              rrd_set_error("writing rrd");
1457                  return;
1458           }
1459           *rra_current += sizeof(rrd_value_t);
1460         }
1461         return (pcdp_summary);
1462 }