fixed version checks to only complain if xml version is > than current RRD version
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.1.x  Copyright Tobias Oetiker, 1997 - 2002
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.8  2003/03/31 21:22:12  oetiker
9  * enables RRDtool updates with microsecond or in case of windows millisecond
10  * precision. This is needed to reduce time measurement error when archive step
11  * is small. (<30s) --  Sasha Mikheev <sasha@avalon-net.co.il>
12  *
13  * Revision 1.7  2003/02/13 07:05:27  oetiker
14  * Find attached the patch I promised to send to you. Please note that there
15  * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
16  * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
17  * library is identical to librrd, but it contains support code for per-thread
18  * global variables currently used for error information only. This is similar
19  * to how errno per-thread variables are implemented.  librrd_th must be linked
20  * alongside of libpthred
21  *
22  * There is also a new file "THREADS", holding some documentation.
23  *
24  * -- Peter Stamfest <peter@stamfest.at>
25  *
26  * Revision 1.6  2002/02/01 20:34:49  oetiker
27  * fixed version number and date/time
28  *
29  * Revision 1.5  2001/05/09 05:31:01  oetiker
30  * Bug fix: when update of multiple PDP/CDP RRAs coincided
31  * with interpolation of multiple PDPs an incorrect value was
32  * stored as the CDP. Especially evident for GAUGE data sources.
33  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
34  *
35  * Revision 1.4  2001/03/10 23:54:41  oetiker
36  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
37  * parser and calculator from rrd_graph and puts then in a new file,
38  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
39  * clean-up of aberrant behavior stuff, including a bug fix.
40  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
41  * -- Jake Brutlag <jakeb@corp.webtv.net>
42  *
43  * Revision 1.3  2001/03/04 13:01:55  oetiker
44  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
45  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
46  * This is backwards compatible! But new files using the Aberrant stuff are not readable
47  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
48  * -- Jake Brutlag <jakeb@corp.webtv.net>
49  *
50  * Revision 1.2  2001/03/04 11:14:25  oetiker
51  * added at-style-time@value:value syntax to rrd_update
52  * --  Dave Bodenstab <imdave@mcs.net>
53  *
54  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
55  * checkin
56  *
57  *****************************************************************************/
58
59 #include "rrd_tool.h"
60 #include <sys/types.h>
61 #include <fcntl.h>
62
63 #ifdef WIN32
64  #include <sys/locking.h>
65  #include <sys/stat.h>
66  #include <io.h>
67 #endif
68
69 #include "rrd_hw.h"
70 #include "rrd_rpncalc.h"
71
72 #include "rrd_is_thread_safe.h"
73
74 #ifdef WIN32
75 /*
76  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
77  * replacement.
78  */
79 #include <sys/timeb.h>
80
81 struct timeval {
82         time_t tv_sec; /* seconds */
83         long tv_usec;  /* microseconds */
84 };
85
86 struct __timezone {
87         int  tz_minuteswest; /* minutes W of Greenwich */
88         int  tz_dsttime;     /* type of dst correction */
89 };
90
91 static gettimeofday(struct timeval *t, struct __timezone *tz) {
92         
93         struct timeb current_time;
94
95         _ftime(&current_time);
96         
97         t->tv_sec  = current_time.time;
98         t->tv_usec = current_time.millitm * 1000;
99 }
100
101 #endif
102 /*
103  * normilize time as returned by gettimeofday. usec part must
104  * be always >= 0
105  */
106 static void normalize_time(struct timeval *t)
107 {
108         if(t->tv_usec < 0) {
109                 t->tv_sec--;
110                 t->tv_usec += 1000000L;
111         }
112 }
113
114 /* Local prototypes */
115 int LockRRD(FILE *rrd_file);
116 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
117                                         unsigned long *rra_current,
118                                         unsigned short CDP_scratch_idx, FILE *rrd_file);
119 int rrd_update_r(char *filename, char *template, int argc, char **argv);
120
121 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
122
123
124 #ifdef STANDALONE
125 int 
126 main(int argc, char **argv){
127         rrd_update(argc,argv);
128         if (rrd_test_error()) {
129                 printf("RRDtool 1.1.x  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
130                         "Usage: rrdupdate filename\n"
131                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
132                         "\t\t\ttime|N:value[:value...]\n\n"
133                         "\t\t\tat-time@value[:value...]\n\n"
134                         "\t\t\t[ time:value[:value...] ..]\n\n");
135                                    
136                 printf("ERROR: %s\n",rrd_get_error());
137                 rrd_clear_error();                                                            
138                 return 1;
139         }
140         return 0;
141 }
142 #endif
143
144 int
145 rrd_update(int argc, char **argv)
146 {
147     char             *template = NULL;          
148     int rc;
149
150     while (1) {
151                 static struct option long_options[] =
152                         {
153                                 {"template",      required_argument, 0, 't'},
154                                 {0,0,0,0}
155                         };
156                 int option_index = 0;
157                 int opt;
158                 opt = getopt_long(argc, argv, "t:", 
159                                                   long_options, &option_index);
160                 
161                 if (opt == EOF)
162                         break;
163                 
164                 switch(opt) {
165                 case 't':
166                         template = optarg;
167                         break;
168                         
169                 case '?':
170                         rrd_set_error("unknown option '%s'",argv[optind-1]);
171                         return(-1);
172                 }
173     }
174
175     /* need at least 2 arguments: filename, data. */
176     if (argc-optind < 2) {
177                 rrd_set_error("Not enough arguments");
178
179                 return -1;
180     }
181
182     rc = rrd_update_r(argv[optind], template,
183                       argc - optind - 1, argv + optind + 1);
184     return rc;
185 }
186
187 int
188 rrd_update_r(char *filename, char *template, int argc, char **argv)
189 {
190
191     int              arg_i = 2;
192     short            j;
193     unsigned long    i,ii,iii=1;
194
195     unsigned long    rra_begin;          /* byte pointer to the rra
196                                           * area in the rrd file.  this
197                                           * pointer never changes value */
198     unsigned long    rra_start;          /* byte pointer to the rra
199                                           * area in the rrd file.  this
200                                           * pointer changes as each rrd is
201                                           * processed. */
202     unsigned long    rra_current;        /* byte pointer to the current write
203                                           * spot in the rrd file. */
204     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
205     double           interval,
206         pre_int,post_int;                /* interval between this and
207                                           * the last run */
208     unsigned long    proc_pdp_st;        /* which pdp_st was the last
209                                           * to be processed */
210     unsigned long    occu_pdp_st;        /* when was the pdp_st
211                                           * before the last update
212                                           * time */
213     unsigned long    proc_pdp_age;       /* how old was the data in
214                                           * the pdp prep area when it
215                                           * was last updated */
216     unsigned long    occu_pdp_age;       /* how long ago was the last
217                                           * pdp_step time */
218     rrd_value_t      *pdp_new;           /* prepare the incoming data
219                                           * to be added the the
220                                           * existing entry */
221     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
222                                           * to be added the the
223                                           * cdp values */
224
225     long             *tmpl_idx;          /* index representing the settings
226                                             transported by the template index */
227     unsigned long    tmpl_cnt = 2;       /* time and data */
228
229     FILE             *rrd_file;
230     rrd_t            rrd;
231     time_t           current_time;
232     unsigned long    current_time_usec;  /* microseconds part of current time */
233     struct timeval   tmp_time;           /* used for time conversion */
234
235     char             **updvals;
236     int              schedule_smooth = 0;
237         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
238                                          /* a vector of future Holt-Winters seasonal coefs */
239     unsigned long    elapsed_pdp_st;
240                                          /* number of elapsed PDP steps since last update */
241     unsigned long    *rra_step_cnt = NULL;
242                                          /* number of rows to be updated in an RRA for a data
243                                           * value. */
244     unsigned long    start_pdp_offset;
245                                          /* number of PDP steps since the last update that
246                                           * are assigned to the first CDP to be generated
247                                           * since the last update. */
248     unsigned short   scratch_idx;
249                                          /* index into the CDP scratch array */
250     enum cf_en       current_cf;
251                                          /* numeric id of the current consolidation function */
252     rpnstack_t       rpnstack; /* used for COMPUTE DS */
253     int              version;  /* rrd version */
254
255     rpnstack_init(&rpnstack);
256
257     /* need at least 1 arguments: data. */
258     if (argc < 1) {
259         rrd_set_error("Not enough arguments");
260         return -1;
261     }
262     
263     
264
265     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
266         return -1;
267     }
268     /* initialize time */
269     version = atoi(rrd.stat_head->version);
270     gettimeofday(&tmp_time, 0);
271     normalize_time(&tmp_time);
272     current_time = tmp_time.tv_sec;
273     if(version >= 3) {
274         current_time_usec = tmp_time.tv_usec;
275     }
276     else {
277         current_time_usec = 0;
278     }
279
280     rra_current = rra_start = rra_begin = ftell(rrd_file);
281     /* This is defined in the ANSI C standard, section 7.9.5.3:
282
283         When a file is opened with udpate mode ('+' as the second
284         or third character in the ... list of mode argument
285         variables), both input and ouptut may be performed on the
286         associated stream.  However, ...  input may not be directly
287         followed by output without an intervening call to a file
288         positioning function, unless the input oepration encounters
289         end-of-file. */
290     fseek(rrd_file, 0, SEEK_CUR);
291
292     
293     /* get exclusive lock to whole file.
294      * lock gets removed when we close the file.
295      */
296     if (LockRRD(rrd_file) != 0) {
297       rrd_set_error("could not lock RRD");
298       rrd_free(&rrd);
299       fclose(rrd_file);
300       return(-1);   
301     } 
302
303     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
304         rrd_set_error("allocating updvals pointer array");
305         rrd_free(&rrd);
306         fclose(rrd_file);
307         return(-1);
308     }
309
310     if ((pdp_temp = malloc(sizeof(rrd_value_t)
311                            *rrd.stat_head->ds_cnt))==NULL){
312         rrd_set_error("allocating pdp_temp ...");
313         free(updvals);
314         rrd_free(&rrd);
315         fclose(rrd_file);
316         return(-1);
317     }
318
319     if ((tmpl_idx = malloc(sizeof(unsigned long)
320                            *(rrd.stat_head->ds_cnt+1)))==NULL){
321         rrd_set_error("allocating tmpl_idx ...");
322         free(pdp_temp);
323         free(updvals);
324         rrd_free(&rrd);
325         fclose(rrd_file);
326         return(-1);
327     }
328     /* initialize template redirector */
329     /* default config example (assume DS 1 is a CDEF DS)
330        tmpl_idx[0] -> 0; (time)
331        tmpl_idx[1] -> 1; (DS 0)
332        tmpl_idx[2] -> 3; (DS 2)
333        tmpl_idx[3] -> 4; (DS 3) */
334     tmpl_idx[0] = 0; /* time */
335     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
336         {
337            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
338               tmpl_idx[ii++]=i;
339         }
340     tmpl_cnt= ii;
341
342     if (template) {
343         char *dsname;
344         unsigned int tmpl_len;
345         dsname = template;
346         tmpl_cnt = 1; /* the first entry is the time */
347         tmpl_len = strlen(template);
348         for(i=0;i<=tmpl_len ;i++) {
349             if (template[i] == ':' || template[i] == '\0') {
350                 template[i] = '\0';
351                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
352                     rrd_set_error("Template contains more DS definitions than RRD");
353                     free(updvals); free(pdp_temp);
354                     free(tmpl_idx); rrd_free(&rrd);
355                     fclose(rrd_file); return(-1);
356                 }
357                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
358                     rrd_set_error("unknown DS name '%s'",dsname);
359                     free(updvals); free(pdp_temp);
360                     free(tmpl_idx); rrd_free(&rrd);
361                     fclose(rrd_file); return(-1);
362                 } else {
363                   /* the first element is always the time */
364                   tmpl_idx[tmpl_cnt-1]++; 
365                   /* go to the next entry on the template */
366                   dsname = &template[i+1];
367                   /* fix the damage we did before */
368                   if (i<tmpl_len) {
369                      template[i]=':';
370                   } 
371
372                 }
373             }       
374         }
375     }
376     if ((pdp_new = malloc(sizeof(rrd_value_t)
377                           *rrd.stat_head->ds_cnt))==NULL){
378         rrd_set_error("allocating pdp_new ...");
379         free(updvals);
380         free(pdp_temp);
381         free(tmpl_idx);
382         rrd_free(&rrd);
383         fclose(rrd_file);
384         return(-1);
385     }
386
387     /* loop through the arguments. */
388     for(arg_i=0; arg_i<argc;arg_i++) {
389         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
390         char *step_start = stepper;
391         char *p;
392         char *parsetime_error = NULL;
393         enum {atstyle, normal} timesyntax;
394         struct time_value ds_tv;
395         if (stepper == NULL){
396                 rrd_set_error("failed duplication argv entry");
397                 free(updvals);
398                 free(pdp_temp);  
399                 free(tmpl_idx);
400                 rrd_free(&rrd);
401                 fclose(rrd_file);
402                 return(-1);
403          }
404         /* initialize all ds input to unknown except the first one
405            which has always got to be set */
406         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
407         strcpy(stepper,argv[arg_i]);
408         updvals[0]=stepper;
409         /* separate all ds elements; first must be examined separately
410            due to alternate time syntax */
411         if ((p=strchr(stepper,'@'))!=NULL) {
412             timesyntax = atstyle;
413             *p = '\0';
414             stepper = p+1;
415         } else if ((p=strchr(stepper,':'))!=NULL) {
416             timesyntax = normal;
417             *p = '\0';
418             stepper = p+1;
419         } else {
420             rrd_set_error("expected timestamp not found in data source from %s:...",
421                           argv[arg_i]);
422             free(step_start);
423             break;
424         }
425         ii=1;
426         updvals[tmpl_idx[ii]] = stepper;
427         while (*stepper) {
428             if (*stepper == ':') {
429                 *stepper = '\0';
430                 ii++;
431                 if (ii<tmpl_cnt){                   
432                     updvals[tmpl_idx[ii]] = stepper+1;
433                 }
434             }
435             stepper++;
436         }
437
438         if (ii != tmpl_cnt-1) {
439             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
440                           tmpl_cnt-1, ii, argv[arg_i]);
441             free(step_start);
442             break;
443         }
444         
445         /* get the time from the reading ... handle N */
446         if (timesyntax == atstyle) {
447             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
448                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
449                 free(step_start);
450                 break;
451             }
452             if (ds_tv.type == RELATIVE_TO_END_TIME ||
453                 ds_tv.type == RELATIVE_TO_START_TIME) {
454                 rrd_set_error("specifying time relative to the 'start' "
455                               "or 'end' makes no sense here: %s",
456                               updvals[0]);
457                 free(step_start);
458                 break;
459             }
460
461             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
462             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
463             
464         } else if (strcmp(updvals[0],"N")==0){
465             gettimeofday(&tmp_time, 0);
466             normalize_time(&tmp_time);
467             current_time = tmp_time.tv_sec;
468             current_time_usec = tmp_time.tv_usec;
469         } else {
470             double tmp;
471             tmp = strtod(updvals[0], 0);
472             current_time = floor(tmp);
473             current_time_usec = (long)((tmp - current_time) * 1000000L);
474         }
475         /* dont do any correction for old version RRDs */
476         if(version < 3) 
477             current_time_usec = 0;
478         
479         if(current_time <= rrd.live_head->last_up){
480             rrd_set_error("illegal attempt to update using time %ld when "
481                           "last update time is %ld (minimum one second step)",
482                           current_time, rrd.live_head->last_up);
483             free(step_start);
484             break;
485         }
486         
487         
488         /* seek to the beginning of the rra's */
489         if (rra_current != rra_begin) {
490             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
491                 rrd_set_error("seek error in rrd");
492                 free(step_start);
493                 break;
494             }
495             rra_current = rra_begin;
496         }
497         rra_start = rra_begin;
498
499         /* when was the current pdp started */
500         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
501         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
502
503         /* when did the last pdp_st occur */
504         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
505         occu_pdp_st = current_time - occu_pdp_age;
506         /* interval = current_time - rrd.live_head->last_up; */
507         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
508     
509         if (occu_pdp_st > proc_pdp_st){
510             /* OK we passed the pdp_st moment*/
511             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
512                                                               * occurred before the latest
513                                                               * pdp_st moment*/
514             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
515             post_int = occu_pdp_age;                         /* how much after it */
516             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
517         } else {
518             pre_int = interval;
519             post_int = 0;
520         }
521
522 #ifdef DEBUG
523         printf(
524                "proc_pdp_age %lu\t"
525                "proc_pdp_st %lu\t" 
526                "occu_pfp_age %lu\t" 
527                "occu_pdp_st %lu\t"
528                "int %lf\t"
529                "pre_int %lf\t"
530                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
531                 occu_pdp_age, occu_pdp_st,
532                interval, pre_int, post_int);
533 #endif
534     
535         /* process the data sources and update the pdp_prep 
536          * area accordingly */
537         for(i=0;i<rrd.stat_head->ds_cnt;i++){
538             enum dst_en dst_idx;
539             dst_idx= dst_conv(rrd.ds_def[i].dst);
540                 /* NOTE: DST_CDEF should never enter this if block, because
541                  * updvals[i+1][0] is initialized to 'U'; unless the caller
542                  * accidently specified a value for the DST_CDEF. To handle 
543                  * this case, an extra check is required. */
544             if((updvals[i+1][0] != 'U') &&
545                    (dst_idx != DST_CDEF) &&
546                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
547                double rate = DNAN;
548                /* the data source type defines how to process the data */
549                 /* pdp_new contains rate * time ... eg the bytes
550                  * transferred during the interval. Doing it this way saves
551                  * a lot of math operations */
552                 
553
554                 switch(dst_idx){
555                 case DST_COUNTER:
556                 case DST_DERIVE:
557                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
558                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
559                        if(dst_idx == DST_COUNTER) {
560                           /* simple overflow catcher sugestet by andres kroonmaa */
561                           /* this will fail terribly for non 32 or 64 bit counters ... */
562                           /* are there any others in SNMP land ? */
563                           if (pdp_new[i] < (double)0.0 ) 
564                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
565                           if (pdp_new[i] < (double)0.0 ) 
566                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
567                        }
568                        rate = pdp_new[i] / interval;
569                     }
570                    else {
571                      pdp_new[i]= DNAN;          
572                    }
573                    break;
574                 case DST_ABSOLUTE:
575                     pdp_new[i]= atof(updvals[i+1]);
576                     rate = pdp_new[i] / interval;                 
577                     break;
578                 case DST_GAUGE:
579                     pdp_new[i] = atof(updvals[i+1]) * interval;
580                     rate = pdp_new[i] / interval;                  
581                     break;
582                 default:
583                     rrd_set_error("rrd contains unknown DS type : '%s'",
584                                   rrd.ds_def[i].dst);
585                     break;
586                 }
587                 /* break out of this for loop if the error string is set */
588                 if (rrd_test_error()){
589                     break;
590                 }
591                /* make sure pdp_temp is neither too large or too small
592                 * if any of these occur it becomes unknown ...
593                 * sorry folks ... */
594                if ( ! isnan(rate) && 
595                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
596                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
597                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
598                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
599                   pdp_new[i] = DNAN;
600                }               
601             } else {
602                 /* no news is news all the same */
603                 pdp_new[i] = DNAN;
604             }
605             
606             /* make a copy of the command line argument for the next run */
607 #ifdef DEBUG
608             fprintf(stderr,
609                     "prep ds[%lu]\t"
610                     "last_arg '%s'\t"
611                     "this_arg '%s'\t"
612                     "pdp_new %10.2f\n",
613                     i,
614                     rrd.pdp_prep[i].last_ds,
615                     updvals[i+1], pdp_new[i]);
616 #endif
617             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
618                 strncpy(rrd.pdp_prep[i].last_ds,
619                         updvals[i+1],LAST_DS_LEN-1);
620                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
621             }
622         }
623         /* break out of the argument parsing loop if the error_string is set */
624         if (rrd_test_error()){
625             free(step_start);
626             break;
627         }
628         /* has a pdp_st moment occurred since the last run ? */
629
630         if (proc_pdp_st == occu_pdp_st){
631             /* no we have not passed a pdp_st moment. therefore update is simple */
632
633             for(i=0;i<rrd.stat_head->ds_cnt;i++){
634                 if(isnan(pdp_new[i]))
635                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
636                 else
637                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
638 #ifdef DEBUG
639                 fprintf(stderr,
640                         "NO PDP  ds[%lu]\t"
641                         "value %10.2f\t"
642                         "unkn_sec %5lu\n",
643                         i,
644                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
645                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
646 #endif
647             }   
648         } else {
649             /* an pdp_st has occurred. */
650
651             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
652              * occurred up to the last run.        
653             pdp_new[] contains rate*seconds from the latest run.
654             pdp_temp[] will contain the rate for cdp */
655
656             for(i=0;i<rrd.stat_head->ds_cnt;i++){
657                 /* update pdp_prep to the current pdp_st */
658                 if(isnan(pdp_new[i]))
659                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
660                 else
661                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
662                         pdp_new[i]/(double)interval*(double)pre_int;
663
664                 /* if too much of the pdp_prep is unknown we dump it */
665                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
666                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
667                     (occu_pdp_st-proc_pdp_st <= 
668                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
669                     pdp_temp[i] = DNAN;
670                 } else {
671                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
672                         / (double)( occu_pdp_st
673                                    - proc_pdp_st
674                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
675                 }
676
677                 /* process CDEF data sources; remember each CDEF DS can
678                  * only reference other DS with a lower index number */
679             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
680                    rpnp_t *rpnp;
681                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
682                    /* substitue data values for OP_VARIABLE nodes */
683                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
684                    {
685                           if (rpnp[ii].op == OP_VARIABLE) {
686                                  rpnp[ii].op = OP_NUMBER;
687                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
688                           }
689                    }
690                    /* run the rpn calculator */
691                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
692                           free(rpnp);
693                           break; /* exits the data sources pdp_temp loop */
694                    }
695                 }
696         
697                 /* make pdp_prep ready for the next run */
698                 if(isnan(pdp_new[i])){
699                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
700                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
701                 } else {
702                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
703                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
704                         pdp_new[i]/(double)interval*(double)post_int;
705                 }
706
707 #ifdef DEBUG
708                 fprintf(stderr,
709                         "PDP UPD ds[%lu]\t"
710                         "pdp_temp %10.2f\t"
711                         "new_prep %10.2f\t"
712                         "new_unkn_sec %5lu\n",
713                         i, pdp_temp[i],
714                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
715                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
716 #endif
717             }
718
719                 /* if there were errors during the last loop, bail out here */
720             if (rrd_test_error()){
721                free(step_start);
722                break;
723             }
724
725                 /* compute the number of elapsed pdp_st moments */
726                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
727 #ifdef DEBUG
728                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
729 #endif
730                 if (rra_step_cnt == NULL)
731                 {
732                    rra_step_cnt = (unsigned long *) 
733                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
734                 }
735
736             for(i = 0, rra_start = rra_begin;
737                 i < rrd.stat_head->rra_cnt;
738             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
739                 i++)
740                 {
741                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
742                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
743                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
744         if (start_pdp_offset <= elapsed_pdp_st) {
745            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
746                       rrd.rra_def[i].pdp_cnt + 1;
747             } else {
748                    rra_step_cnt[i] = 0;
749                 }
750
751                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
752                 {
753                    /* If this is a bulk update, we need to skip ahead in the seasonal
754                         * arrays so that they will be correct for the next observed value;
755                         * note that for the bulk update itself, no update will occur to
756                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
757                         * be set to DNAN. */
758            if (rra_step_cnt[i] > 2) 
759                    {
760                           /* skip update by resetting rra_step_cnt[i],
761                            * note that this is not data source specific; this is due
762                            * to the bulk update, not a DNAN value for the specific data
763                            * source. */
764                           rra_step_cnt[i] = 0;
765               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
766                              &last_seasonal_coef);
767                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
768                              &seasonal_coef);
769                    }
770                 
771                   /* periodically run a smoother for seasonal effects */
772                   /* Need to use first cdp parameter buffer to track
773                    * burnin (burnin requires a specific smoothing schedule).
774                    * The CDP_init_seasonal parameter is really an RRA level,
775                    * not a data source within RRA level parameter, but the rra_def
776                    * is read only for rrd_update (not flushed to disk). */
777                   iii = i*(rrd.stat_head -> ds_cnt);
778                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
779                           <= BURNIN_CYCLES)
780                   {
781                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
782                                  > rrd.rra_def[i].row_cnt - 1) {
783                            /* mark off one of the burnin cycles */
784                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
785                        schedule_smooth = 1;
786                          }  
787                   } else {
788                          /* someone has no doubt invented a trick to deal with this
789                           * wrap around, but at least this code is clear. */
790                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
791                              rrd.rra_ptr[i].cur_row)
792                          {
793                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
794                                   * mapping between PDP and CDP */
795                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
796                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
797                                  {
798 #ifdef DEBUG
799                                         fprintf(stderr,
800                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
801                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
802                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
803 #endif
804                                         schedule_smooth = 1;
805                                  }
806              } else {
807                                  /* can't rely on negative numbers because we are working with
808                                   * unsigned values */
809                                  /* Don't need modulus here. If we've wrapped more than once, only
810                                   * one smooth is executed at the end. */
811                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
812                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
813                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
814                                  {
815 #ifdef DEBUG
816                                         fprintf(stderr,
817                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
818                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
819                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
820 #endif
821                                         schedule_smooth = 1;
822                                  }
823                          }
824                   }
825
826               rra_current = ftell(rrd_file); 
827                 } /* if cf is DEVSEASONAL or SEASONAL */
828
829         if (rrd_test_error()) break;
830
831                     /* update CDP_PREP areas */
832                     /* loop over data soures within each RRA */
833                     for(ii = 0;
834                         ii < rrd.stat_head->ds_cnt;
835                         ii++)
836                         {
837                         
838                         /* iii indexes the CDP prep area for this data source within the RRA */
839                         iii=i*rrd.stat_head->ds_cnt+ii;
840
841                         if (rrd.rra_def[i].pdp_cnt > 1) {
842                           
843                            if (rra_step_cnt[i] > 0) {
844                            /* If we are in this block, as least 1 CDP value will be written to
845                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
846                                 * to be written, then the "fill in" value is the CDP_secondary_val
847                                 * entry. */
848                                   if (isnan(pdp_temp[ii]))
849                   {
850                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
851                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
852                                   } else {
853                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
854                                           * CDP data entries. No matter the CF, the value is the same because
855                                           * the average, max, min, and last of a list of identical values is
856                                           * the same, namely, the value itself. */
857                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
858                                   }
859                      
860                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
861                                       > rrd.rra_def[i].pdp_cnt*
862                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
863                                   {
864                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
865                                          /* initialize carry over */
866                                          if (current_cf == CF_AVERAGE) {
867                                                    if (isnan(pdp_temp[ii])) { 
868                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
869                                                    } else {
870                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
871                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
872                                                    }
873                                          } else {
874                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
875                                          }
876                                   } else {
877                                          rrd_value_t cum_val, cur_val; 
878                                      switch (current_cf) {
879                                                 case CF_AVERAGE:
880                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
881                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
882                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
883                                                (cum_val + cur_val * start_pdp_offset) /
884                                            (rrd.rra_def[i].pdp_cnt
885                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
886                                                    /* initialize carry over value */
887                                                    if (isnan(pdp_temp[ii])) { 
888                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
889                                                    } else {
890                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
891                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
892                                                    }
893                                                    break;
894                                                 case CF_MAXIMUM:
895                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
896                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
897 #ifdef DEBUG
898                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
899                                                           isnan(pdp_temp[ii])) {
900                                                      fprintf(stderr,
901                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
902                                                                 i,ii);
903                                                          exit(-1);
904                                                   }
905 #endif
906                                                   if (cur_val > cum_val)
907                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
908                                                   else
909                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
910                                                   /* initialize carry over value */
911                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
912                                                   break;
913                                                 case CF_MINIMUM:
914                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
915                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
916 #ifdef DEBUG
917                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
918                                                           isnan(pdp_temp[ii])) {
919                                                      fprintf(stderr,
920                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
921                                                                 i,ii);
922                                                          exit(-1);
923                                                   }
924 #endif
925                                                   if (cur_val < cum_val)
926                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
927                                                   else
928                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
929                                                   /* initialize carry over value */
930                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
931                                                   break;
932                                                 case CF_LAST:
933                                                 default:
934                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
935                                                    /* initialize carry over value */
936                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
937                                                 break;
938                                          }
939                                   } /* endif meets xff value requirement for a valid value */
940                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
941                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
942                                   if (isnan(pdp_temp[ii]))
943                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
944                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
945                                   else
946                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
947                } else  /* rra_step_cnt[i]  == 0 */
948                            {
949 #ifdef DEBUG
950                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
951                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
952                                          i,ii);
953                                   } else {
954                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
955                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
956                                   }
957 #endif
958                                   if (isnan(pdp_temp[ii])) {
959                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
960                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
961                                   {
962                                          if (current_cf == CF_AVERAGE) {
963                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
964                                                    elapsed_pdp_st;
965                                          } else {
966                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
967                                          }
968 #ifdef DEBUG
969                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
970                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
971 #endif
972                                   } else {
973                                          switch (current_cf) {
974                                          case CF_AVERAGE:
975                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
976                                                    elapsed_pdp_st;
977                                                 break;
978                                          case CF_MINIMUM:
979                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
980                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
981                                                 break; 
982                                          case CF_MAXIMUM:
983                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
984                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
985                                                 break; 
986                                          case CF_LAST:
987                                          default:
988                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
989                                                 break;
990                                          }
991                                   }
992                            }
993                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
994                            if (elapsed_pdp_st > 2)
995                            {
996                                    switch (current_cf) {
997                                    case CF_AVERAGE:
998                                    default:
999                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1000                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1001                                           break;
1002                    case CF_SEASONAL:
1003                                    case CF_DEVSEASONAL:
1004                                           /* need to update cached seasonal values, so they are consistent
1005                                            * with the bulk update */
1006                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1007                                            * CDP_last_deviation are the same. */
1008                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1009                                                  last_seasonal_coef[ii];
1010                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1011                                                  seasonal_coef[ii];
1012                                           break;
1013                    case CF_HWPREDICT:
1014                                           /* need to update the null_count and last_null_count.
1015                                            * even do this for non-DNAN pdp_temp because the
1016                                            * algorithm is not learning from batch updates. */
1017                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1018                                                  elapsed_pdp_st;
1019                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1020                                                  elapsed_pdp_st - 1;
1021                                           /* fall through */
1022                                    case CF_DEVPREDICT:
1023                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1024                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1025                                           break;
1026                    case CF_FAILURES:
1027                                           /* do not count missed bulk values as failures */
1028                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1029                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1030                                           /* need to reset violations buffer.
1031                                            * could do this more carefully, but for now, just
1032                                            * assume a bulk update wipes away all violations. */
1033                       erase_violations(&rrd, iii, i);
1034                                           break;
1035                                    }
1036                            } 
1037                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1038
1039                         if (rrd_test_error()) break;
1040
1041                         } /* endif data sources loop */
1042         } /* end RRA Loop */
1043
1044                 /* this loop is only entered if elapsed_pdp_st < 3 */
1045                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1046                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1047                 {
1048                for(i = 0, rra_start = rra_begin;
1049                    i < rrd.stat_head->rra_cnt;
1050                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1051                    i++)
1052                    {
1053                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1054
1055                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1056                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1057                           {
1058                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1059                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1060                                 &seasonal_coef);
1061                  rra_current = ftell(rrd_file);
1062                           }
1063                           if (rrd_test_error()) break;
1064                       /* loop over data soures within each RRA */
1065                       for(ii = 0;
1066                           ii < rrd.stat_head->ds_cnt;
1067                           ii++)
1068                           {
1069                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1070                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1071                                     scratch_idx, seasonal_coef);
1072                           }
1073            } /* end RRA Loop */
1074                    if (rrd_test_error()) break;
1075             } /* end elapsed_pdp_st loop */
1076
1077                 if (rrd_test_error()) break;
1078
1079                 /* Ready to write to disk */
1080                 /* Move sequentially through the file, writing one RRA at a time.
1081                  * Note this architecture divorces the computation of CDP with
1082                  * flushing updated RRA entries to disk. */
1083             for(i = 0, rra_start = rra_begin;
1084                 i < rrd.stat_head->rra_cnt;
1085             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1086                 i++) {
1087                 /* is there anything to write for this RRA? If not, continue. */
1088         if (rra_step_cnt[i] == 0) continue;
1089
1090                 /* write the first row */
1091 #ifdef DEBUG
1092         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1093 #endif
1094             rrd.rra_ptr[i].cur_row++;
1095             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1096                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1097                 /* positition on the first row */
1098                 rra_pos_tmp = rra_start +
1099                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1100                 if(rra_pos_tmp != rra_current) {
1101                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1102                       rrd_set_error("seek error in rrd");
1103                       break;
1104                    }
1105                    rra_current = rra_pos_tmp;
1106                 }
1107
1108 #ifdef DEBUG
1109             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1110 #endif
1111                 scratch_idx = CDP_primary_val;
1112                 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1113                 if (rrd_test_error()) break;
1114
1115                 /* write other rows of the bulk update, if any */
1116                 scratch_idx = CDP_secondary_val;
1117                 for ( ; rra_step_cnt[i] > 1; 
1118                      rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1119                 {
1120                    if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1121                    {
1122 #ifdef DEBUG
1123               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1124                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1125 #endif
1126                           /* wrap */
1127                           rrd.rra_ptr[i].cur_row = 0;
1128                           /* seek back to beginning of current rra */
1129                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1130                           {
1131                          rrd_set_error("seek error in rrd");
1132                          break;
1133                           }
1134 #ifdef DEBUG
1135                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1136 #endif
1137                           rra_current = rra_start;
1138                    }
1139                    write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1140                 }
1141                 
1142                 if (rrd_test_error())
1143                   break;
1144                 } /* RRA LOOP */
1145
1146             /* break out of the argument parsing loop if error_string is set */
1147             if (rrd_test_error()){
1148                    free(step_start);
1149                    break;
1150             } 
1151             
1152         } /* endif a pdp_st has occurred */ 
1153         rrd.live_head->last_up = current_time;
1154         rrd.live_head->last_up_usec = current_time_usec; 
1155         free(step_start);
1156     } /* function argument loop */
1157
1158     if (seasonal_coef != NULL) free(seasonal_coef);
1159     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1160         if (rra_step_cnt != NULL) free(rra_step_cnt);
1161     rpnstack_free(&rpnstack);
1162
1163     /* if we got here and if there is an error and if the file has not been
1164      * written to, then close things up and return. */
1165     if (rrd_test_error()) {
1166         free(updvals);
1167         free(tmpl_idx);
1168         rrd_free(&rrd);
1169         free(pdp_temp);
1170         free(pdp_new);
1171         fclose(rrd_file);
1172         return(-1);
1173     }
1174
1175     /* aargh ... that was tough ... so many loops ... anyway, its done.
1176      * we just need to write back the live header portion now*/
1177
1178     if (fseek(rrd_file, (sizeof(stat_head_t)
1179                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1180                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1181               SEEK_SET) != 0) {
1182         rrd_set_error("seek rrd for live header writeback");
1183         free(updvals);
1184         free(tmpl_idx);
1185         rrd_free(&rrd);
1186         free(pdp_temp);
1187         free(pdp_new);
1188         fclose(rrd_file);
1189         return(-1);
1190     }
1191
1192     if(version >= 3) {
1193             if(fwrite( rrd.live_head,
1194                        sizeof(live_head_t), 1, rrd_file) != 1){
1195                 rrd_set_error("fwrite live_head to rrd");
1196                 free(updvals);
1197                 rrd_free(&rrd);
1198                 free(tmpl_idx);
1199                 free(pdp_temp);
1200                 free(pdp_new);
1201                 fclose(rrd_file);
1202                 return(-1);
1203             }
1204     }
1205     else {
1206             if(fwrite( &rrd.live_head->last_up,
1207                        sizeof(time_t), 1, rrd_file) != 1){
1208                 rrd_set_error("fwrite live_head to rrd");
1209                 free(updvals);
1210                 rrd_free(&rrd);
1211                 free(tmpl_idx);
1212                 free(pdp_temp);
1213                 free(pdp_new);
1214                 fclose(rrd_file);
1215                 return(-1);
1216             }
1217     }
1218             
1219
1220     if(fwrite( rrd.pdp_prep,
1221                sizeof(pdp_prep_t),
1222                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1223         rrd_set_error("ftwrite pdp_prep to rrd");
1224         free(updvals);
1225         rrd_free(&rrd);
1226         free(tmpl_idx);
1227         free(pdp_temp);
1228         free(pdp_new);
1229         fclose(rrd_file);
1230         return(-1);
1231     }
1232
1233     if(fwrite( rrd.cdp_prep,
1234                sizeof(cdp_prep_t),
1235                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1236        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1237
1238         rrd_set_error("ftwrite cdp_prep to rrd");
1239         free(updvals);
1240         free(tmpl_idx);
1241         rrd_free(&rrd);
1242         free(pdp_temp);
1243         free(pdp_new);
1244         fclose(rrd_file);
1245         return(-1);
1246     }
1247
1248     if(fwrite( rrd.rra_ptr,
1249                sizeof(rra_ptr_t), 
1250                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1251         rrd_set_error("fwrite rra_ptr to rrd");
1252         free(updvals);
1253         free(tmpl_idx);
1254         rrd_free(&rrd);
1255         free(pdp_temp);
1256         free(pdp_new);
1257         fclose(rrd_file);
1258         return(-1);
1259     }
1260
1261     /* OK now close the files and free the memory */
1262     if(fclose(rrd_file) != 0){
1263         rrd_set_error("closing rrd");
1264         free(updvals);
1265         free(tmpl_idx);
1266         rrd_free(&rrd);
1267         free(pdp_temp);
1268         free(pdp_new);
1269         return(-1);
1270     }
1271
1272     /* calling the smoothing code here guarantees at most
1273          * one smoothing operation per rrd_update call. Unfortunately,
1274          * it is possible with bulk updates, or a long-delayed update
1275          * for smoothing to occur off-schedule. This really isn't
1276          * critical except during the burning cycles. */
1277         if (schedule_smooth)
1278         {
1279 #ifndef WIN32
1280           rrd_file = fopen(filename,"r+");
1281 #else
1282           rrd_file = fopen(filename,"rb+");
1283 #endif
1284           rra_start = rra_begin;
1285           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1286           {
1287             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1288                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1289             {
1290 #ifdef DEBUG
1291               fprintf(stderr,"Running smoother for rra %ld\n",i);
1292 #endif
1293               apply_smoother(&rrd,i,rra_start,rrd_file);
1294               if (rrd_test_error())
1295                 break;
1296             }
1297             rra_start += rrd.rra_def[i].row_cnt
1298               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1299           }
1300           fclose(rrd_file);
1301         }
1302     rrd_free(&rrd);
1303     free(updvals);
1304     free(tmpl_idx);
1305     free(pdp_new);
1306     free(pdp_temp);
1307     return(0);
1308 }
1309
1310 /*
1311  * get exclusive lock to whole file.
1312  * lock gets removed when we close the file
1313  *
1314  * returns 0 on success
1315  */
1316 int
1317 LockRRD(FILE *rrdfile)
1318 {
1319     int rrd_fd;         /* File descriptor for RRD */
1320     int                 stat;
1321
1322     rrd_fd = fileno(rrdfile);
1323
1324         {
1325 #ifndef WIN32    
1326                 struct flock    lock;
1327     lock.l_type = F_WRLCK;    /* exclusive write lock */
1328     lock.l_len = 0;           /* whole file */
1329     lock.l_start = 0;         /* start of file */
1330     lock.l_whence = SEEK_SET;   /* end of file */
1331
1332     stat = fcntl(rrd_fd, F_SETLK, &lock);
1333 #else
1334                 struct _stat st;
1335
1336                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1337                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1338                 } else {
1339                         stat = -1;
1340                 }
1341 #endif
1342         }
1343
1344     return(stat);
1345 }
1346
1347
1348 void
1349 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1350                unsigned short CDP_scratch_idx, FILE *rrd_file)
1351 {
1352    unsigned long ds_idx, cdp_idx;
1353
1354    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1355    {
1356       /* compute the cdp index */
1357       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1358 #ifdef DEBUG
1359           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1360              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1361              rrd -> rra_def[rra_idx].cf_nam);
1362 #endif 
1363
1364           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1365                  sizeof(rrd_value_t),1,rrd_file) != 1)
1366           { 
1367              rrd_set_error("writing rrd");
1368                  return;
1369           }
1370           *rra_current += sizeof(rrd_value_t);
1371         }
1372 }