Alternate update interface, updatev. Returns info about CDPs written to disk as resul...
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.1.x  Copyright Tobias Oetiker, 1997 - 2002
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.9  2003/04/25 18:35:08  jake
9  * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
10  *
11  * Revision 1.8  2003/03/31 21:22:12  oetiker
12  * enables RRDtool updates with microsecond or in case of windows millisecond
13  * precision. This is needed to reduce time measurement error when archive step
14  * is small. (<30s) --  Sasha Mikheev <sasha@avalon-net.co.il>
15  *
16  * Revision 1.7  2003/02/13 07:05:27  oetiker
17  * Find attached the patch I promised to send to you. Please note that there
18  * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
19  * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
20  * library is identical to librrd, but it contains support code for per-thread
21  * global variables currently used for error information only. This is similar
22  * to how errno per-thread variables are implemented.  librrd_th must be linked
23  * alongside of libpthred
24  *
25  * There is also a new file "THREADS", holding some documentation.
26  *
27  * -- Peter Stamfest <peter@stamfest.at>
28  *
29  * Revision 1.6  2002/02/01 20:34:49  oetiker
30  * fixed version number and date/time
31  *
32  * Revision 1.5  2001/05/09 05:31:01  oetiker
33  * Bug fix: when update of multiple PDP/CDP RRAs coincided
34  * with interpolation of multiple PDPs an incorrect value was
35  * stored as the CDP. Especially evident for GAUGE data sources.
36  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
37  *
38  * Revision 1.4  2001/03/10 23:54:41  oetiker
39  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
40  * parser and calculator from rrd_graph and puts then in a new file,
41  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
42  * clean-up of aberrant behavior stuff, including a bug fix.
43  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
44  * -- Jake Brutlag <jakeb@corp.webtv.net>
45  *
46  * Revision 1.3  2001/03/04 13:01:55  oetiker
47  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
48  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
49  * This is backwards compatible! But new files using the Aberrant stuff are not readable
50  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
51  * -- Jake Brutlag <jakeb@corp.webtv.net>
52  *
53  * Revision 1.2  2001/03/04 11:14:25  oetiker
54  * added at-style-time@value:value syntax to rrd_update
55  * --  Dave Bodenstab <imdave@mcs.net>
56  *
57  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
58  * checkin
59  *
60  *****************************************************************************/
61
62 #include "rrd_tool.h"
63 #include <sys/types.h>
64 #include <fcntl.h>
65
66 #ifdef WIN32
67  #include <sys/locking.h>
68  #include <sys/stat.h>
69  #include <io.h>
70 #endif
71
72 #include "rrd_hw.h"
73 #include "rrd_rpncalc.h"
74
75 #include "rrd_is_thread_safe.h"
76
77 #ifdef WIN32
78 /*
79  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
80  * replacement.
81  */
82 #include <sys/timeb.h>
83
84 struct timeval {
85         time_t tv_sec; /* seconds */
86         long tv_usec;  /* microseconds */
87 };
88
89 struct __timezone {
90         int  tz_minuteswest; /* minutes W of Greenwich */
91         int  tz_dsttime;     /* type of dst correction */
92 };
93
94 static gettimeofday(struct timeval *t, struct __timezone *tz) {
95         
96         struct timeb current_time;
97
98         _ftime(&current_time);
99         
100         t->tv_sec  = current_time.time;
101         t->tv_usec = current_time.millitm * 1000;
102 }
103
104 #endif
105 /*
106  * normilize time as returned by gettimeofday. usec part must
107  * be always >= 0
108  */
109 static void normalize_time(struct timeval *t)
110 {
111         if(t->tv_usec < 0) {
112                 t->tv_sec--;
113                 t->tv_usec += 1000000L;
114         }
115 }
116
117 /* Local prototypes */
118 int LockRRD(FILE *rrd_file);
119 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
120                                         unsigned long *rra_current,
121                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
122                                         info_t *pcdp_summary, time_t *rra_time);
123 int rrd_update_r(char *filename, char *template, int argc, char **argv);
124 int _rrd_update(char *filename, char *template, int argc, char **argv, 
125                                         info_t*);
126
127 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
128
129
130 #ifdef STANDALONE
131 int 
132 main(int argc, char **argv){
133         rrd_update(argc,argv);
134         if (rrd_test_error()) {
135                 printf("RRDtool 1.1.x  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
136                         "Usage: rrdupdate filename\n"
137                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
138                         "\t\t\ttime|N:value[:value...]\n\n"
139                         "\t\t\tat-time@value[:value...]\n\n"
140                         "\t\t\t[ time:value[:value...] ..]\n\n");
141                                    
142                 printf("ERROR: %s\n",rrd_get_error());
143                 rrd_clear_error();                                                            
144                 return 1;
145         }
146         return 0;
147 }
148 #endif
149
150 info_t *rrd_update_v(int argc, char **argv)
151 {
152     char             *template = NULL;          
153         info_t *result = NULL;
154         infoval rc;
155
156     while (1) {
157                 static struct option long_options[] =
158                         {
159                                 {"template",      required_argument, 0, 't'},
160                                 {0,0,0,0}
161                         };
162                 int option_index = 0;
163                 int opt;
164                 opt = getopt_long(argc, argv, "t:", 
165                                                   long_options, &option_index);
166                 
167                 if (opt == EOF)
168                         break;
169                 
170                 switch(opt) {
171                 case 't':
172                         template = optarg;
173                         break;
174                 
175                 case '?':
176                         rrd_set_error("unknown option '%s'",argv[optind-1]);
177             rc.u_int = -1;
178                         goto end_tag;
179                 }
180     }
181
182     /* need at least 2 arguments: filename, data. */
183     if (argc-optind < 2) {
184                 rrd_set_error("Not enough arguments");
185         rc.u_int = -1;
186                 goto end_tag;
187     }
188     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
189         rc.u_int = _rrd_update(argv[optind], template,
190                       argc - optind - 1, argv + optind + 1, result);
191     result->value.u_int = rc.u_int;
192 end_tag:
193     return result;
194 }
195
196 int
197 rrd_update(int argc, char **argv)
198 {
199     char             *template = NULL;          
200     int rc;
201
202     while (1) {
203                 static struct option long_options[] =
204                         {
205                                 {"template",      required_argument, 0, 't'},
206                                 {0,0,0,0}
207                         };
208                 int option_index = 0;
209                 int opt;
210                 opt = getopt_long(argc, argv, "t:", 
211                                                   long_options, &option_index);
212                 
213                 if (opt == EOF)
214                         break;
215                 
216                 switch(opt) {
217                 case 't':
218                         template = optarg;
219                         break;
220                 
221                 case '?':
222                         rrd_set_error("unknown option '%s'",argv[optind-1]);
223                         return(-1);
224                 }
225     }
226
227     /* need at least 2 arguments: filename, data. */
228     if (argc-optind < 2) {
229                 rrd_set_error("Not enough arguments");
230
231                 return -1;
232     }
233  
234         rc = rrd_update_r(argv[optind], template,
235                       argc - optind - 1, argv + optind + 1);
236     return rc;
237 }
238
239 int
240 rrd_update_r(char *filename, char *template, int argc, char **argv)
241 {
242    return _rrd_update(filename, template, argc, argv, NULL);
243 }
244
245 int
246 _rrd_update(char *filename, char *template, int argc, char **argv, 
247    info_t *pcdp_summary)
248 {
249
250     int              arg_i = 2;
251     short            j;
252     unsigned long    i,ii,iii=1;
253
254     unsigned long    rra_begin;          /* byte pointer to the rra
255                                           * area in the rrd file.  this
256                                           * pointer never changes value */
257     unsigned long    rra_start;          /* byte pointer to the rra
258                                           * area in the rrd file.  this
259                                           * pointer changes as each rrd is
260                                           * processed. */
261     unsigned long    rra_current;        /* byte pointer to the current write
262                                           * spot in the rrd file. */
263     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
264     double           interval,
265         pre_int,post_int;                /* interval between this and
266                                           * the last run */
267     unsigned long    proc_pdp_st;        /* which pdp_st was the last
268                                           * to be processed */
269     unsigned long    occu_pdp_st;        /* when was the pdp_st
270                                           * before the last update
271                                           * time */
272     unsigned long    proc_pdp_age;       /* how old was the data in
273                                           * the pdp prep area when it
274                                           * was last updated */
275     unsigned long    occu_pdp_age;       /* how long ago was the last
276                                           * pdp_step time */
277     rrd_value_t      *pdp_new;           /* prepare the incoming data
278                                           * to be added the the
279                                           * existing entry */
280     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
281                                           * to be added the the
282                                           * cdp values */
283
284     long             *tmpl_idx;          /* index representing the settings
285                                             transported by the template index */
286     unsigned long    tmpl_cnt = 2;       /* time and data */
287
288     FILE             *rrd_file;
289     rrd_t            rrd;
290     time_t           current_time;
291         time_t           rra_time; /* time of update for a RRA */
292     unsigned long    current_time_usec;  /* microseconds part of current time */
293     struct timeval   tmp_time;           /* used for time conversion */
294
295     char             **updvals;
296     int              schedule_smooth = 0;
297         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
298                                          /* a vector of future Holt-Winters seasonal coefs */
299     unsigned long    elapsed_pdp_st;
300                                          /* number of elapsed PDP steps since last update */
301     unsigned long    *rra_step_cnt = NULL;
302                                          /* number of rows to be updated in an RRA for a data
303                                           * value. */
304     unsigned long    start_pdp_offset;
305                                          /* number of PDP steps since the last update that
306                                           * are assigned to the first CDP to be generated
307                                           * since the last update. */
308     unsigned short   scratch_idx;
309                                          /* index into the CDP scratch array */
310     enum cf_en       current_cf;
311                                          /* numeric id of the current consolidation function */
312     rpnstack_t       rpnstack; /* used for COMPUTE DS */
313     int              version;  /* rrd version */
314
315     rpnstack_init(&rpnstack);
316
317     /* need at least 1 arguments: data. */
318     if (argc < 1) {
319         rrd_set_error("Not enough arguments");
320         return -1;
321     }
322     
323     
324
325     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
326         return -1;
327     }
328     /* initialize time */
329     version = atoi(rrd.stat_head->version);
330     gettimeofday(&tmp_time, 0);
331     normalize_time(&tmp_time);
332     current_time = tmp_time.tv_sec;
333     if(version >= 3) {
334         current_time_usec = tmp_time.tv_usec;
335     }
336     else {
337         current_time_usec = 0;
338     }
339
340     rra_current = rra_start = rra_begin = ftell(rrd_file);
341     /* This is defined in the ANSI C standard, section 7.9.5.3:
342
343         When a file is opened with udpate mode ('+' as the second
344         or third character in the ... list of mode argument
345         variables), both input and ouptut may be performed on the
346         associated stream.  However, ...  input may not be directly
347         followed by output without an intervening call to a file
348         positioning function, unless the input oepration encounters
349         end-of-file. */
350     fseek(rrd_file, 0, SEEK_CUR);
351
352     
353     /* get exclusive lock to whole file.
354      * lock gets removed when we close the file.
355      */
356     if (LockRRD(rrd_file) != 0) {
357       rrd_set_error("could not lock RRD");
358       rrd_free(&rrd);
359       fclose(rrd_file);
360       return(-1);   
361     } 
362
363     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
364         rrd_set_error("allocating updvals pointer array");
365         rrd_free(&rrd);
366         fclose(rrd_file);
367         return(-1);
368     }
369
370     if ((pdp_temp = malloc(sizeof(rrd_value_t)
371                            *rrd.stat_head->ds_cnt))==NULL){
372         rrd_set_error("allocating pdp_temp ...");
373         free(updvals);
374         rrd_free(&rrd);
375         fclose(rrd_file);
376         return(-1);
377     }
378
379     if ((tmpl_idx = malloc(sizeof(unsigned long)
380                            *(rrd.stat_head->ds_cnt+1)))==NULL){
381         rrd_set_error("allocating tmpl_idx ...");
382         free(pdp_temp);
383         free(updvals);
384         rrd_free(&rrd);
385         fclose(rrd_file);
386         return(-1);
387     }
388     /* initialize template redirector */
389     /* default config example (assume DS 1 is a CDEF DS)
390        tmpl_idx[0] -> 0; (time)
391        tmpl_idx[1] -> 1; (DS 0)
392        tmpl_idx[2] -> 3; (DS 2)
393        tmpl_idx[3] -> 4; (DS 3) */
394     tmpl_idx[0] = 0; /* time */
395     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
396         {
397            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
398               tmpl_idx[ii++]=i;
399         }
400     tmpl_cnt= ii;
401
402     if (template) {
403         char *dsname;
404         unsigned int tmpl_len;
405         dsname = template;
406         tmpl_cnt = 1; /* the first entry is the time */
407         tmpl_len = strlen(template);
408         for(i=0;i<=tmpl_len ;i++) {
409             if (template[i] == ':' || template[i] == '\0') {
410                 template[i] = '\0';
411                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
412                     rrd_set_error("Template contains more DS definitions than RRD");
413                     free(updvals); free(pdp_temp);
414                     free(tmpl_idx); rrd_free(&rrd);
415                     fclose(rrd_file); return(-1);
416                 }
417                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
418                     rrd_set_error("unknown DS name '%s'",dsname);
419                     free(updvals); free(pdp_temp);
420                     free(tmpl_idx); rrd_free(&rrd);
421                     fclose(rrd_file); return(-1);
422                 } else {
423                   /* the first element is always the time */
424                   tmpl_idx[tmpl_cnt-1]++; 
425                   /* go to the next entry on the template */
426                   dsname = &template[i+1];
427                   /* fix the damage we did before */
428                   if (i<tmpl_len) {
429                      template[i]=':';
430                   } 
431
432                 }
433             }       
434         }
435     }
436     if ((pdp_new = malloc(sizeof(rrd_value_t)
437                           *rrd.stat_head->ds_cnt))==NULL){
438         rrd_set_error("allocating pdp_new ...");
439         free(updvals);
440         free(pdp_temp);
441         free(tmpl_idx);
442         rrd_free(&rrd);
443         fclose(rrd_file);
444         return(-1);
445     }
446
447     /* loop through the arguments. */
448     for(arg_i=0; arg_i<argc;arg_i++) {
449         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
450         char *step_start = stepper;
451         char *p;
452         char *parsetime_error = NULL;
453         enum {atstyle, normal} timesyntax;
454         struct time_value ds_tv;
455         if (stepper == NULL){
456                 rrd_set_error("failed duplication argv entry");
457                 free(updvals);
458                 free(pdp_temp);  
459                 free(tmpl_idx);
460                 rrd_free(&rrd);
461                 fclose(rrd_file);
462                 return(-1);
463          }
464         /* initialize all ds input to unknown except the first one
465            which has always got to be set */
466         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
467         strcpy(stepper,argv[arg_i]);
468         updvals[0]=stepper;
469         /* separate all ds elements; first must be examined separately
470            due to alternate time syntax */
471         if ((p=strchr(stepper,'@'))!=NULL) {
472             timesyntax = atstyle;
473             *p = '\0';
474             stepper = p+1;
475         } else if ((p=strchr(stepper,':'))!=NULL) {
476             timesyntax = normal;
477             *p = '\0';
478             stepper = p+1;
479         } else {
480             rrd_set_error("expected timestamp not found in data source from %s:...",
481                           argv[arg_i]);
482             free(step_start);
483             break;
484         }
485         ii=1;
486         updvals[tmpl_idx[ii]] = stepper;
487         while (*stepper) {
488             if (*stepper == ':') {
489                 *stepper = '\0';
490                 ii++;
491                 if (ii<tmpl_cnt){                   
492                     updvals[tmpl_idx[ii]] = stepper+1;
493                 }
494             }
495             stepper++;
496         }
497
498         if (ii != tmpl_cnt-1) {
499             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
500                           tmpl_cnt-1, ii, argv[arg_i]);
501             free(step_start);
502             break;
503         }
504         
505         /* get the time from the reading ... handle N */
506         if (timesyntax == atstyle) {
507             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
508                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
509                 free(step_start);
510                 break;
511             }
512             if (ds_tv.type == RELATIVE_TO_END_TIME ||
513                 ds_tv.type == RELATIVE_TO_START_TIME) {
514                 rrd_set_error("specifying time relative to the 'start' "
515                               "or 'end' makes no sense here: %s",
516                               updvals[0]);
517                 free(step_start);
518                 break;
519             }
520
521             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
522             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
523             
524         } else if (strcmp(updvals[0],"N")==0){
525             gettimeofday(&tmp_time, 0);
526             normalize_time(&tmp_time);
527             current_time = tmp_time.tv_sec;
528             current_time_usec = tmp_time.tv_usec;
529         } else {
530             double tmp;
531             tmp = strtod(updvals[0], 0);
532             current_time = floor(tmp);
533             current_time_usec = (long)((tmp - current_time) * 1000000L);
534         }
535         /* dont do any correction for old version RRDs */
536         if(version < 3) 
537             current_time_usec = 0;
538         
539         if(current_time <= rrd.live_head->last_up){
540             rrd_set_error("illegal attempt to update using time %ld when "
541                           "last update time is %ld (minimum one second step)",
542                           current_time, rrd.live_head->last_up);
543             free(step_start);
544             break;
545         }
546         
547         
548         /* seek to the beginning of the rra's */
549         if (rra_current != rra_begin) {
550             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
551                 rrd_set_error("seek error in rrd");
552                 free(step_start);
553                 break;
554             }
555             rra_current = rra_begin;
556         }
557         rra_start = rra_begin;
558
559         /* when was the current pdp started */
560         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
561         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
562
563         /* when did the last pdp_st occur */
564         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
565         occu_pdp_st = current_time - occu_pdp_age;
566         /* interval = current_time - rrd.live_head->last_up; */
567         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
568     
569         if (occu_pdp_st > proc_pdp_st){
570             /* OK we passed the pdp_st moment*/
571             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
572                                                               * occurred before the latest
573                                                               * pdp_st moment*/
574             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
575             post_int = occu_pdp_age;                         /* how much after it */
576             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
577         } else {
578             pre_int = interval;
579             post_int = 0;
580         }
581
582 #ifdef DEBUG
583         printf(
584                "proc_pdp_age %lu\t"
585                "proc_pdp_st %lu\t" 
586                "occu_pfp_age %lu\t" 
587                "occu_pdp_st %lu\t"
588                "int %lf\t"
589                "pre_int %lf\t"
590                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
591                 occu_pdp_age, occu_pdp_st,
592                interval, pre_int, post_int);
593 #endif
594     
595         /* process the data sources and update the pdp_prep 
596          * area accordingly */
597         for(i=0;i<rrd.stat_head->ds_cnt;i++){
598             enum dst_en dst_idx;
599             dst_idx= dst_conv(rrd.ds_def[i].dst);
600                 /* NOTE: DST_CDEF should never enter this if block, because
601                  * updvals[i+1][0] is initialized to 'U'; unless the caller
602                  * accidently specified a value for the DST_CDEF. To handle 
603                  * this case, an extra check is required. */
604             if((updvals[i+1][0] != 'U') &&
605                    (dst_idx != DST_CDEF) &&
606                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
607                double rate = DNAN;
608                /* the data source type defines how to process the data */
609                 /* pdp_new contains rate * time ... eg the bytes
610                  * transferred during the interval. Doing it this way saves
611                  * a lot of math operations */
612                 
613
614                 switch(dst_idx){
615                 case DST_COUNTER:
616                 case DST_DERIVE:
617                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
618                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
619                        if(dst_idx == DST_COUNTER) {
620                           /* simple overflow catcher sugestet by andres kroonmaa */
621                           /* this will fail terribly for non 32 or 64 bit counters ... */
622                           /* are there any others in SNMP land ? */
623                           if (pdp_new[i] < (double)0.0 ) 
624                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
625                           if (pdp_new[i] < (double)0.0 ) 
626                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
627                        }
628                        rate = pdp_new[i] / interval;
629                     }
630                    else {
631                      pdp_new[i]= DNAN;          
632                    }
633                    break;
634                 case DST_ABSOLUTE:
635                     pdp_new[i]= atof(updvals[i+1]);
636                     rate = pdp_new[i] / interval;                 
637                     break;
638                 case DST_GAUGE:
639                     pdp_new[i] = atof(updvals[i+1]) * interval;
640                     rate = pdp_new[i] / interval;                  
641                     break;
642                 default:
643                     rrd_set_error("rrd contains unknown DS type : '%s'",
644                                   rrd.ds_def[i].dst);
645                     break;
646                 }
647                 /* break out of this for loop if the error string is set */
648                 if (rrd_test_error()){
649                     break;
650                 }
651                /* make sure pdp_temp is neither too large or too small
652                 * if any of these occur it becomes unknown ...
653                 * sorry folks ... */
654                if ( ! isnan(rate) && 
655                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
656                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
657                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
658                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
659                   pdp_new[i] = DNAN;
660                }               
661             } else {
662                 /* no news is news all the same */
663                 pdp_new[i] = DNAN;
664             }
665             
666             /* make a copy of the command line argument for the next run */
667 #ifdef DEBUG
668             fprintf(stderr,
669                     "prep ds[%lu]\t"
670                     "last_arg '%s'\t"
671                     "this_arg '%s'\t"
672                     "pdp_new %10.2f\n",
673                     i,
674                     rrd.pdp_prep[i].last_ds,
675                     updvals[i+1], pdp_new[i]);
676 #endif
677             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
678                 strncpy(rrd.pdp_prep[i].last_ds,
679                         updvals[i+1],LAST_DS_LEN-1);
680                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
681             }
682         }
683         /* break out of the argument parsing loop if the error_string is set */
684         if (rrd_test_error()){
685             free(step_start);
686             break;
687         }
688         /* has a pdp_st moment occurred since the last run ? */
689
690         if (proc_pdp_st == occu_pdp_st){
691             /* no we have not passed a pdp_st moment. therefore update is simple */
692
693             for(i=0;i<rrd.stat_head->ds_cnt;i++){
694                 if(isnan(pdp_new[i]))
695                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
696                 else
697                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
698 #ifdef DEBUG
699                 fprintf(stderr,
700                         "NO PDP  ds[%lu]\t"
701                         "value %10.2f\t"
702                         "unkn_sec %5lu\n",
703                         i,
704                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
705                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
706 #endif
707             }   
708         } else {
709             /* an pdp_st has occurred. */
710
711             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
712              * occurred up to the last run.        
713             pdp_new[] contains rate*seconds from the latest run.
714             pdp_temp[] will contain the rate for cdp */
715
716             for(i=0;i<rrd.stat_head->ds_cnt;i++){
717                 /* update pdp_prep to the current pdp_st */
718                 if(isnan(pdp_new[i]))
719                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
720                 else
721                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
722                         pdp_new[i]/(double)interval*(double)pre_int;
723
724                 /* if too much of the pdp_prep is unknown we dump it */
725                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
726                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
727                     (occu_pdp_st-proc_pdp_st <= 
728                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
729                     pdp_temp[i] = DNAN;
730                 } else {
731                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
732                         / (double)( occu_pdp_st
733                                    - proc_pdp_st
734                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
735                 }
736
737                 /* process CDEF data sources; remember each CDEF DS can
738                  * only reference other DS with a lower index number */
739             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
740                    rpnp_t *rpnp;
741                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
742                    /* substitue data values for OP_VARIABLE nodes */
743                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
744                    {
745                           if (rpnp[ii].op == OP_VARIABLE) {
746                                  rpnp[ii].op = OP_NUMBER;
747                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
748                           }
749                    }
750                    /* run the rpn calculator */
751                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
752                           free(rpnp);
753                           break; /* exits the data sources pdp_temp loop */
754                    }
755                 }
756         
757                 /* make pdp_prep ready for the next run */
758                 if(isnan(pdp_new[i])){
759                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
760                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
761                 } else {
762                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
763                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
764                         pdp_new[i]/(double)interval*(double)post_int;
765                 }
766
767 #ifdef DEBUG
768                 fprintf(stderr,
769                         "PDP UPD ds[%lu]\t"
770                         "pdp_temp %10.2f\t"
771                         "new_prep %10.2f\t"
772                         "new_unkn_sec %5lu\n",
773                         i, pdp_temp[i],
774                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
775                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
776 #endif
777             }
778
779                 /* if there were errors during the last loop, bail out here */
780             if (rrd_test_error()){
781                free(step_start);
782                break;
783             }
784
785                 /* compute the number of elapsed pdp_st moments */
786                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
787 #ifdef DEBUG
788                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
789 #endif
790                 if (rra_step_cnt == NULL)
791                 {
792                    rra_step_cnt = (unsigned long *) 
793                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
794                 }
795
796             for(i = 0, rra_start = rra_begin;
797                 i < rrd.stat_head->rra_cnt;
798             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
799                 i++)
800                 {
801                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
802                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
803                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
804         if (start_pdp_offset <= elapsed_pdp_st) {
805            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
806                       rrd.rra_def[i].pdp_cnt + 1;
807             } else {
808                    rra_step_cnt[i] = 0;
809                 }
810
811                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
812                 {
813                    /* If this is a bulk update, we need to skip ahead in the seasonal
814                         * arrays so that they will be correct for the next observed value;
815                         * note that for the bulk update itself, no update will occur to
816                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
817                         * be set to DNAN. */
818            if (rra_step_cnt[i] > 2) 
819                    {
820                           /* skip update by resetting rra_step_cnt[i],
821                            * note that this is not data source specific; this is due
822                            * to the bulk update, not a DNAN value for the specific data
823                            * source. */
824                           rra_step_cnt[i] = 0;
825               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
826                              &last_seasonal_coef);
827                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
828                              &seasonal_coef);
829                    }
830                 
831                   /* periodically run a smoother for seasonal effects */
832                   /* Need to use first cdp parameter buffer to track
833                    * burnin (burnin requires a specific smoothing schedule).
834                    * The CDP_init_seasonal parameter is really an RRA level,
835                    * not a data source within RRA level parameter, but the rra_def
836                    * is read only for rrd_update (not flushed to disk). */
837                   iii = i*(rrd.stat_head -> ds_cnt);
838                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
839                           <= BURNIN_CYCLES)
840                   {
841                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
842                                  > rrd.rra_def[i].row_cnt - 1) {
843                            /* mark off one of the burnin cycles */
844                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
845                        schedule_smooth = 1;
846                          }  
847                   } else {
848                          /* someone has no doubt invented a trick to deal with this
849                           * wrap around, but at least this code is clear. */
850                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
851                              rrd.rra_ptr[i].cur_row)
852                          {
853                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
854                                   * mapping between PDP and CDP */
855                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
856                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
857                                  {
858 #ifdef DEBUG
859                                         fprintf(stderr,
860                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
861                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
862                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
863 #endif
864                                         schedule_smooth = 1;
865                                  }
866              } else {
867                                  /* can't rely on negative numbers because we are working with
868                                   * unsigned values */
869                                  /* Don't need modulus here. If we've wrapped more than once, only
870                                   * one smooth is executed at the end. */
871                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
872                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
873                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
874                                  {
875 #ifdef DEBUG
876                                         fprintf(stderr,
877                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
878                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
879                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
880 #endif
881                                         schedule_smooth = 1;
882                                  }
883                          }
884                   }
885
886               rra_current = ftell(rrd_file); 
887                 } /* if cf is DEVSEASONAL or SEASONAL */
888
889         if (rrd_test_error()) break;
890
891                     /* update CDP_PREP areas */
892                     /* loop over data soures within each RRA */
893                     for(ii = 0;
894                         ii < rrd.stat_head->ds_cnt;
895                         ii++)
896                         {
897                         
898                         /* iii indexes the CDP prep area for this data source within the RRA */
899                         iii=i*rrd.stat_head->ds_cnt+ii;
900
901                         if (rrd.rra_def[i].pdp_cnt > 1) {
902                           
903                            if (rra_step_cnt[i] > 0) {
904                            /* If we are in this block, as least 1 CDP value will be written to
905                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
906                                 * to be written, then the "fill in" value is the CDP_secondary_val
907                                 * entry. */
908                                   if (isnan(pdp_temp[ii]))
909                   {
910                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
911                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
912                                   } else {
913                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
914                                           * CDP data entries. No matter the CF, the value is the same because
915                                           * the average, max, min, and last of a list of identical values is
916                                           * the same, namely, the value itself. */
917                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
918                                   }
919                      
920                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
921                                       > rrd.rra_def[i].pdp_cnt*
922                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
923                                   {
924                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
925                                          /* initialize carry over */
926                                          if (current_cf == CF_AVERAGE) {
927                                                    if (isnan(pdp_temp[ii])) { 
928                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
929                                                    } else {
930                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
931                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
932                                                    }
933                                          } else {
934                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
935                                          }
936                                   } else {
937                                          rrd_value_t cum_val, cur_val; 
938                                      switch (current_cf) {
939                                                 case CF_AVERAGE:
940                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
941                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
942                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
943                                                (cum_val + cur_val * start_pdp_offset) /
944                                            (rrd.rra_def[i].pdp_cnt
945                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
946                                                    /* initialize carry over value */
947                                                    if (isnan(pdp_temp[ii])) { 
948                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
949                                                    } else {
950                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
951                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
952                                                    }
953                                                    break;
954                                                 case CF_MAXIMUM:
955                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
956                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
957 #ifdef DEBUG
958                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
959                                                           isnan(pdp_temp[ii])) {
960                                                      fprintf(stderr,
961                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
962                                                                 i,ii);
963                                                          exit(-1);
964                                                   }
965 #endif
966                                                   if (cur_val > cum_val)
967                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
968                                                   else
969                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
970                                                   /* initialize carry over value */
971                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
972                                                   break;
973                                                 case CF_MINIMUM:
974                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
975                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
976 #ifdef DEBUG
977                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
978                                                           isnan(pdp_temp[ii])) {
979                                                      fprintf(stderr,
980                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
981                                                                 i,ii);
982                                                          exit(-1);
983                                                   }
984 #endif
985                                                   if (cur_val < cum_val)
986                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
987                                                   else
988                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
989                                                   /* initialize carry over value */
990                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
991                                                   break;
992                                                 case CF_LAST:
993                                                 default:
994                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
995                                                    /* initialize carry over value */
996                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
997                                                 break;
998                                          }
999                                   } /* endif meets xff value requirement for a valid value */
1000                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1001                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1002                                   if (isnan(pdp_temp[ii]))
1003                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1004                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1005                                   else
1006                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1007                } else  /* rra_step_cnt[i]  == 0 */
1008                            {
1009 #ifdef DEBUG
1010                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1011                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1012                                          i,ii);
1013                                   } else {
1014                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1015                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1016                                   }
1017 #endif
1018                                   if (isnan(pdp_temp[ii])) {
1019                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1020                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1021                                   {
1022                                          if (current_cf == CF_AVERAGE) {
1023                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1024                                                    elapsed_pdp_st;
1025                                          } else {
1026                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1027                                          }
1028 #ifdef DEBUG
1029                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1030                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1031 #endif
1032                                   } else {
1033                                          switch (current_cf) {
1034                                          case CF_AVERAGE:
1035                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1036                                                    elapsed_pdp_st;
1037                                                 break;
1038                                          case CF_MINIMUM:
1039                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1040                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1041                                                 break; 
1042                                          case CF_MAXIMUM:
1043                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1044                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1045                                                 break; 
1046                                          case CF_LAST:
1047                                          default:
1048                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1049                                                 break;
1050                                          }
1051                                   }
1052                            }
1053                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1054                            if (elapsed_pdp_st > 2)
1055                            {
1056                                    switch (current_cf) {
1057                                    case CF_AVERAGE:
1058                                    default:
1059                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1060                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1061                                           break;
1062                    case CF_SEASONAL:
1063                                    case CF_DEVSEASONAL:
1064                                           /* need to update cached seasonal values, so they are consistent
1065                                            * with the bulk update */
1066                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1067                                            * CDP_last_deviation are the same. */
1068                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1069                                                  last_seasonal_coef[ii];
1070                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1071                                                  seasonal_coef[ii];
1072                                           break;
1073                    case CF_HWPREDICT:
1074                                           /* need to update the null_count and last_null_count.
1075                                            * even do this for non-DNAN pdp_temp because the
1076                                            * algorithm is not learning from batch updates. */
1077                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1078                                                  elapsed_pdp_st;
1079                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1080                                                  elapsed_pdp_st - 1;
1081                                           /* fall through */
1082                                    case CF_DEVPREDICT:
1083                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1084                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1085                                           break;
1086                    case CF_FAILURES:
1087                                           /* do not count missed bulk values as failures */
1088                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1089                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1090                                           /* need to reset violations buffer.
1091                                            * could do this more carefully, but for now, just
1092                                            * assume a bulk update wipes away all violations. */
1093                       erase_violations(&rrd, iii, i);
1094                                           break;
1095                                    }
1096                            } 
1097                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1098
1099                         if (rrd_test_error()) break;
1100
1101                         } /* endif data sources loop */
1102         } /* end RRA Loop */
1103
1104                 /* this loop is only entered if elapsed_pdp_st < 3 */
1105                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1106                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1107                 {
1108                for(i = 0, rra_start = rra_begin;
1109                    i < rrd.stat_head->rra_cnt;
1110                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1111                    i++)
1112                    {
1113                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1114
1115                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1116                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1117                           {
1118                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1119                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1120                                 &seasonal_coef);
1121                  rra_current = ftell(rrd_file);
1122                           }
1123                           if (rrd_test_error()) break;
1124                       /* loop over data soures within each RRA */
1125                       for(ii = 0;
1126                           ii < rrd.stat_head->ds_cnt;
1127                           ii++)
1128                           {
1129                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1130                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1131                                     scratch_idx, seasonal_coef);
1132                           }
1133            } /* end RRA Loop */
1134                    if (rrd_test_error()) break;
1135             } /* end elapsed_pdp_st loop */
1136
1137                 if (rrd_test_error()) break;
1138
1139                 /* Ready to write to disk */
1140                 /* Move sequentially through the file, writing one RRA at a time.
1141                  * Note this architecture divorces the computation of CDP with
1142                  * flushing updated RRA entries to disk. */
1143             for(i = 0, rra_start = rra_begin;
1144                 i < rrd.stat_head->rra_cnt;
1145             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1146                 i++) {
1147                 /* is there anything to write for this RRA? If not, continue. */
1148         if (rra_step_cnt[i] == 0) continue;
1149
1150                 /* write the first row */
1151 #ifdef DEBUG
1152         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1153 #endif
1154             rrd.rra_ptr[i].cur_row++;
1155             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1156                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1157                 /* positition on the first row */
1158                 rra_pos_tmp = rra_start +
1159                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1160                 if(rra_pos_tmp != rra_current) {
1161                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1162                       rrd_set_error("seek error in rrd");
1163                       break;
1164                    }
1165                    rra_current = rra_pos_tmp;
1166                 }
1167
1168 #ifdef DEBUG
1169             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1170 #endif
1171                 scratch_idx = CDP_primary_val;
1172                 if (pcdp_summary != NULL)
1173                 {
1174                    rra_time = (current_time - current_time 
1175                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1176                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1177                 }
1178                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1179                    pcdp_summary, &rra_time);
1180                 if (rrd_test_error()) break;
1181
1182                 /* write other rows of the bulk update, if any */
1183                 scratch_idx = CDP_secondary_val;
1184                 for ( ; rra_step_cnt[i] > 1; 
1185                      rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1186                 {
1187                    if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1188                    {
1189 #ifdef DEBUG
1190               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1191                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1192 #endif
1193                           /* wrap */
1194                           rrd.rra_ptr[i].cur_row = 0;
1195                           /* seek back to beginning of current rra */
1196                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1197                           {
1198                          rrd_set_error("seek error in rrd");
1199                          break;
1200                           }
1201 #ifdef DEBUG
1202                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1203 #endif
1204                           rra_current = rra_start;
1205                    }
1206                    if (pcdp_summary != NULL)
1207                    {
1208                       rra_time = (current_time - current_time 
1209                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1210                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1211                    }
1212                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1213                       pcdp_summary, &rra_time);
1214                 }
1215                 
1216                 if (rrd_test_error())
1217                   break;
1218                 } /* RRA LOOP */
1219
1220             /* break out of the argument parsing loop if error_string is set */
1221             if (rrd_test_error()){
1222                    free(step_start);
1223                    break;
1224             } 
1225             
1226         } /* endif a pdp_st has occurred */ 
1227         rrd.live_head->last_up = current_time;
1228         rrd.live_head->last_up_usec = current_time_usec; 
1229         free(step_start);
1230     } /* function argument loop */
1231
1232     if (seasonal_coef != NULL) free(seasonal_coef);
1233     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1234         if (rra_step_cnt != NULL) free(rra_step_cnt);
1235     rpnstack_free(&rpnstack);
1236
1237     /* if we got here and if there is an error and if the file has not been
1238      * written to, then close things up and return. */
1239     if (rrd_test_error()) {
1240         free(updvals);
1241         free(tmpl_idx);
1242         rrd_free(&rrd);
1243         free(pdp_temp);
1244         free(pdp_new);
1245         fclose(rrd_file);
1246         return(-1);
1247     }
1248
1249     /* aargh ... that was tough ... so many loops ... anyway, its done.
1250      * we just need to write back the live header portion now*/
1251
1252     if (fseek(rrd_file, (sizeof(stat_head_t)
1253                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1254                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1255               SEEK_SET) != 0) {
1256         rrd_set_error("seek rrd for live header writeback");
1257         free(updvals);
1258         free(tmpl_idx);
1259         rrd_free(&rrd);
1260         free(pdp_temp);
1261         free(pdp_new);
1262         fclose(rrd_file);
1263         return(-1);
1264     }
1265
1266     if(version >= 3) {
1267             if(fwrite( rrd.live_head,
1268                        sizeof(live_head_t), 1, rrd_file) != 1){
1269                 rrd_set_error("fwrite live_head to rrd");
1270                 free(updvals);
1271                 rrd_free(&rrd);
1272                 free(tmpl_idx);
1273                 free(pdp_temp);
1274                 free(pdp_new);
1275                 fclose(rrd_file);
1276                 return(-1);
1277             }
1278     }
1279     else {
1280             if(fwrite( &rrd.live_head->last_up,
1281                        sizeof(time_t), 1, rrd_file) != 1){
1282                 rrd_set_error("fwrite live_head to rrd");
1283                 free(updvals);
1284                 rrd_free(&rrd);
1285                 free(tmpl_idx);
1286                 free(pdp_temp);
1287                 free(pdp_new);
1288                 fclose(rrd_file);
1289                 return(-1);
1290             }
1291     }
1292             
1293
1294     if(fwrite( rrd.pdp_prep,
1295                sizeof(pdp_prep_t),
1296                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1297         rrd_set_error("ftwrite pdp_prep to rrd");
1298         free(updvals);
1299         rrd_free(&rrd);
1300         free(tmpl_idx);
1301         free(pdp_temp);
1302         free(pdp_new);
1303         fclose(rrd_file);
1304         return(-1);
1305     }
1306
1307     if(fwrite( rrd.cdp_prep,
1308                sizeof(cdp_prep_t),
1309                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1310        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1311
1312         rrd_set_error("ftwrite cdp_prep to rrd");
1313         free(updvals);
1314         free(tmpl_idx);
1315         rrd_free(&rrd);
1316         free(pdp_temp);
1317         free(pdp_new);
1318         fclose(rrd_file);
1319         return(-1);
1320     }
1321
1322     if(fwrite( rrd.rra_ptr,
1323                sizeof(rra_ptr_t), 
1324                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1325         rrd_set_error("fwrite rra_ptr to rrd");
1326         free(updvals);
1327         free(tmpl_idx);
1328         rrd_free(&rrd);
1329         free(pdp_temp);
1330         free(pdp_new);
1331         fclose(rrd_file);
1332         return(-1);
1333     }
1334
1335     /* OK now close the files and free the memory */
1336     if(fclose(rrd_file) != 0){
1337         rrd_set_error("closing rrd");
1338         free(updvals);
1339         free(tmpl_idx);
1340         rrd_free(&rrd);
1341         free(pdp_temp);
1342         free(pdp_new);
1343         return(-1);
1344     }
1345
1346     /* calling the smoothing code here guarantees at most
1347          * one smoothing operation per rrd_update call. Unfortunately,
1348          * it is possible with bulk updates, or a long-delayed update
1349          * for smoothing to occur off-schedule. This really isn't
1350          * critical except during the burning cycles. */
1351         if (schedule_smooth)
1352         {
1353 #ifndef WIN32
1354           rrd_file = fopen(filename,"r+");
1355 #else
1356           rrd_file = fopen(filename,"rb+");
1357 #endif
1358           rra_start = rra_begin;
1359           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1360           {
1361             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1362                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1363             {
1364 #ifdef DEBUG
1365               fprintf(stderr,"Running smoother for rra %ld\n",i);
1366 #endif
1367               apply_smoother(&rrd,i,rra_start,rrd_file);
1368               if (rrd_test_error())
1369                 break;
1370             }
1371             rra_start += rrd.rra_def[i].row_cnt
1372               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1373           }
1374           fclose(rrd_file);
1375         }
1376     rrd_free(&rrd);
1377     free(updvals);
1378     free(tmpl_idx);
1379     free(pdp_new);
1380     free(pdp_temp);
1381     return(0);
1382 }
1383
1384 /*
1385  * get exclusive lock to whole file.
1386  * lock gets removed when we close the file
1387  *
1388  * returns 0 on success
1389  */
1390 int
1391 LockRRD(FILE *rrdfile)
1392 {
1393     int rrd_fd;         /* File descriptor for RRD */
1394     int                 stat;
1395
1396     rrd_fd = fileno(rrdfile);
1397
1398         {
1399 #ifndef WIN32    
1400                 struct flock    lock;
1401     lock.l_type = F_WRLCK;    /* exclusive write lock */
1402     lock.l_len = 0;           /* whole file */
1403     lock.l_start = 0;         /* start of file */
1404     lock.l_whence = SEEK_SET;   /* end of file */
1405
1406     stat = fcntl(rrd_fd, F_SETLK, &lock);
1407 #else
1408                 struct _stat st;
1409
1410                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1411                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1412                 } else {
1413                         stat = -1;
1414                 }
1415 #endif
1416         }
1417
1418     return(stat);
1419 }
1420
1421
1422 info_t
1423 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1424                unsigned short CDP_scratch_idx, FILE *rrd_file,
1425                    info_t *pcdp_summary, time_t *rra_time)
1426 {
1427    unsigned long ds_idx, cdp_idx;
1428    infoval iv;
1429   
1430    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1431    {
1432       /* compute the cdp index */
1433       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1434 #ifdef DEBUG
1435           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1436              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1437              rrd -> rra_def[rra_idx].cf_nam);
1438 #endif 
1439       if (pcdp_summary != NULL)
1440           {
1441              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1442              /* append info to the return hash */
1443                  pcdp_summary = info_push(pcdp_summary,
1444                  sprintf_alloc("[%d]RRA[%lu]DS[%s]",
1445                  *rra_time, rra_idx, rrd->ds_def[ds_idx].ds_nam),
1446          RD_I_VAL, iv);
1447           }
1448           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1449                  sizeof(rrd_value_t),1,rrd_file) != 1)
1450           { 
1451              rrd_set_error("writing rrd");
1452                  return;
1453           }
1454           *rra_current += sizeof(rrd_value_t);
1455         }
1456         return (pcdp_summary);
1457 }