removed extra line from top
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.1.x  Copyright Tobias Oetiker, 1997 - 2004
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.17  2004/05/26 22:11:12  oetiker
9  * reduce compiler warnings. Many small fixes. -- Mike Slifcak <slif@bellsouth.net>
10  *
11  * Revision 1.16  2004/05/25 20:52:16  oetiker
12  * fix spelling and syntax, especially in messages that are printed -- Mike Slifcak
13  *
14  * Revision 1.15  2004/05/25 20:51:49  oetiker
15  * Update displayed copyright messages to be consistent. -- Mike Slifcak
16  *
17  * Revision 1.14  2003/11/11 19:46:21  oetiker
18  * replaced time_value with rrd_time_value as MacOS X introduced a struct of that name in their standard headers
19  *
20  * Revision 1.13  2003/11/11 19:38:03  oetiker
21  * rrd files should NOT change size ever ... bulk update code wa buggy.
22  * -- David M. Grimes <dgrimes@navisite.com>
23  *
24  * Revision 1.12  2003/09/04 13:16:12  oetiker
25  * should not assigne but compare ... grrrrr
26  *
27  * Revision 1.11  2003/09/02 21:58:35  oetiker
28  * be pickier about what we accept in rrd_update. Complain if things do not work out
29  *
30  * Revision 1.10  2003/04/29 19:14:12  jake
31  * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
32  * Also revert accidental addition of -I to aclocal MakeMakefile.
33  *
34  * Revision 1.9  2003/04/25 18:35:08  jake
35  * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
36  *
37  * Revision 1.8  2003/03/31 21:22:12  oetiker
38  * enables RRDtool updates with microsecond or in case of windows millisecond
39  * precision. This is needed to reduce time measurement error when archive step
40  * is small. (<30s) --  Sasha Mikheev <sasha@avalon-net.co.il>
41  *
42  * Revision 1.7  2003/02/13 07:05:27  oetiker
43  * Find attached the patch I promised to send to you. Please note that there
44  * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
45  * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
46  * library is identical to librrd, but it contains support code for per-thread
47  * global variables currently used for error information only. This is similar
48  * to how errno per-thread variables are implemented.  librrd_th must be linked
49  * alongside of libpthred
50  *
51  * There is also a new file "THREADS", holding some documentation.
52  *
53  * -- Peter Stamfest <peter@stamfest.at>
54  *
55  * Revision 1.6  2002/02/01 20:34:49  oetiker
56  * fixed version number and date/time
57  *
58  * Revision 1.5  2001/05/09 05:31:01  oetiker
59  * Bug fix: when update of multiple PDP/CDP RRAs coincided
60  * with interpolation of multiple PDPs an incorrect value was
61  * stored as the CDP. Especially evident for GAUGE data sources.
62  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
63  *
64  * Revision 1.4  2001/03/10 23:54:41  oetiker
65  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
66  * parser and calculator from rrd_graph and puts then in a new file,
67  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
68  * clean-up of aberrant behavior stuff, including a bug fix.
69  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
70  * -- Jake Brutlag <jakeb@corp.webtv.net>
71  *
72  * Revision 1.3  2001/03/04 13:01:55  oetiker
73  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
74  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
75  * This is backwards compatible! But new files using the Aberrant stuff are not readable
76  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
77  * -- Jake Brutlag <jakeb@corp.webtv.net>
78  *
79  * Revision 1.2  2001/03/04 11:14:25  oetiker
80  * added at-style-time@value:value syntax to rrd_update
81  * --  Dave Bodenstab <imdave@mcs.net>
82  *
83  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
84  * checkin
85  *
86  *****************************************************************************/
87
88 #include "rrd_tool.h"
89 #include <sys/types.h>
90 #include <fcntl.h>
91 #ifdef HAVE_MMAP
92  #include <sys/mman.h>
93 #endif
94
95 #ifdef WIN32
96  #include <sys/locking.h>
97  #include <sys/stat.h>
98  #include <io.h>
99 #endif
100
101 #include "rrd_hw.h"
102 #include "rrd_rpncalc.h"
103
104 #include "rrd_is_thread_safe.h"
105
106 #ifdef WIN32
107 /*
108  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
109  * replacement.
110  */
111 #include <sys/timeb.h>
112
113 struct timeval {
114         time_t tv_sec; /* seconds */
115         long tv_usec;  /* microseconds */
116 };
117
118 struct __timezone {
119         int  tz_minuteswest; /* minutes W of Greenwich */
120         int  tz_dsttime;     /* type of dst correction */
121 };
122
123 static gettimeofday(struct timeval *t, struct __timezone *tz) {
124         
125         struct timeb current_time;
126
127         _ftime(&current_time);
128         
129         t->tv_sec  = current_time.time;
130         t->tv_usec = current_time.millitm * 1000;
131 }
132
133 #endif
134 /*
135  * normilize time as returned by gettimeofday. usec part must
136  * be always >= 0
137  */
138 static void normalize_time(struct timeval *t)
139 {
140         if(t->tv_usec < 0) {
141                 t->tv_sec--;
142                 t->tv_usec += 1000000L;
143         }
144 }
145
146 /* Local prototypes */
147 int LockRRD(FILE *rrd_file);
148 #ifdef HAVE_MMAP
149 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
150                                         unsigned long *rra_current,
151                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
152                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
153 #else
154 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
155                                         unsigned long *rra_current,
156                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
157                                         info_t *pcdp_summary, time_t *rra_time);
158 #endif
159 int rrd_update_r(char *filename, char *template, int argc, char **argv);
160 int _rrd_update(char *filename, char *template, int argc, char **argv, 
161                                         info_t*);
162
163 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
164
165
166 #ifdef STANDALONE
167 int 
168 main(int argc, char **argv){
169         rrd_update(argc,argv);
170         if (rrd_test_error()) {
171                 printf("RRDtool 1.1.x  Copyright (C) 1997-2004 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
172                         "Usage: rrdupdate filename\n"
173                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
174                         "\t\t\ttime|N:value[:value...]\n\n"
175                         "\t\t\tat-time@value[:value...]\n\n"
176                         "\t\t\t[ time:value[:value...] ..]\n\n");
177                                    
178                 printf("ERROR: %s\n",rrd_get_error());
179                 rrd_clear_error();                                                            
180                 return 1;
181         }
182         return 0;
183 }
184 #endif
185
186 info_t *rrd_update_v(int argc, char **argv)
187 {
188     char             *template = NULL;          
189         info_t *result = NULL;
190         infoval rc;
191
192     while (1) {
193                 static struct option long_options[] =
194                         {
195                                 {"template",      required_argument, 0, 't'},
196                                 {0,0,0,0}
197                         };
198                 int option_index = 0;
199                 int opt;
200                 opt = getopt_long(argc, argv, "t:", 
201                                                   long_options, &option_index);
202                 
203                 if (opt == EOF)
204                         break;
205                 
206                 switch(opt) {
207                 case 't':
208                         template = optarg;
209                         break;
210                 
211                 case '?':
212                         rrd_set_error("unknown option '%s'",argv[optind-1]);
213             rc.u_int = -1;
214                         goto end_tag;
215                 }
216     }
217
218     /* need at least 2 arguments: filename, data. */
219     if (argc-optind < 2) {
220                 rrd_set_error("Not enough arguments");
221         rc.u_int = -1;
222                 goto end_tag;
223     }
224     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
225         rc.u_int = _rrd_update(argv[optind], template,
226                       argc - optind - 1, argv + optind + 1, result);
227     result->value.u_int = rc.u_int;
228 end_tag:
229     return result;
230 }
231
232 int
233 rrd_update(int argc, char **argv)
234 {
235     char             *template = NULL;          
236     int rc;
237
238     while (1) {
239                 static struct option long_options[] =
240                         {
241                                 {"template",      required_argument, 0, 't'},
242                                 {0,0,0,0}
243                         };
244                 int option_index = 0;
245                 int opt;
246                 opt = getopt_long(argc, argv, "t:", 
247                                                   long_options, &option_index);
248                 
249                 if (opt == EOF)
250                         break;
251                 
252                 switch(opt) {
253                 case 't':
254                         template = optarg;
255                         break;
256                 
257                 case '?':
258                         rrd_set_error("unknown option '%s'",argv[optind-1]);
259                         return(-1);
260                 }
261     }
262
263     /* need at least 2 arguments: filename, data. */
264     if (argc-optind < 2) {
265                 rrd_set_error("Not enough arguments");
266
267                 return -1;
268     }
269  
270         rc = rrd_update_r(argv[optind], template,
271                       argc - optind - 1, argv + optind + 1);
272     return rc;
273 }
274
275 int
276 rrd_update_r(char *filename, char *template, int argc, char **argv)
277 {
278    return _rrd_update(filename, template, argc, argv, NULL);
279 }
280
281 int
282 _rrd_update(char *filename, char *template, int argc, char **argv, 
283    info_t *pcdp_summary)
284 {
285
286     int              arg_i = 2;
287     short            j;
288     unsigned long    i,ii,iii=1;
289
290     unsigned long    rra_begin;          /* byte pointer to the rra
291                                           * area in the rrd file.  this
292                                           * pointer never changes value */
293     unsigned long    rra_start;          /* byte pointer to the rra
294                                           * area in the rrd file.  this
295                                           * pointer changes as each rrd is
296                                           * processed. */
297     unsigned long    rra_current;        /* byte pointer to the current write
298                                           * spot in the rrd file. */
299     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
300     double           interval,
301         pre_int,post_int;                /* interval between this and
302                                           * the last run */
303     unsigned long    proc_pdp_st;        /* which pdp_st was the last
304                                           * to be processed */
305     unsigned long    occu_pdp_st;        /* when was the pdp_st
306                                           * before the last update
307                                           * time */
308     unsigned long    proc_pdp_age;       /* how old was the data in
309                                           * the pdp prep area when it
310                                           * was last updated */
311     unsigned long    occu_pdp_age;       /* how long ago was the last
312                                           * pdp_step time */
313     rrd_value_t      *pdp_new;           /* prepare the incoming data
314                                           * to be added the the
315                                           * existing entry */
316     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
317                                           * to be added the the
318                                           * cdp values */
319
320     long             *tmpl_idx;          /* index representing the settings
321                                             transported by the template index */
322     unsigned long    tmpl_cnt = 2;       /* time and data */
323
324     FILE             *rrd_file;
325     rrd_t            rrd;
326     time_t           current_time;
327         time_t           rra_time; /* time of update for a RRA */
328     unsigned long    current_time_usec;  /* microseconds part of current time */
329     struct timeval   tmp_time;           /* used for time conversion */
330
331     char             **updvals;
332     int              schedule_smooth = 0;
333         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
334                                          /* a vector of future Holt-Winters seasonal coefs */
335     unsigned long    elapsed_pdp_st;
336                                          /* number of elapsed PDP steps since last update */
337     unsigned long    *rra_step_cnt = NULL;
338                                          /* number of rows to be updated in an RRA for a data
339                                           * value. */
340     unsigned long    start_pdp_offset;
341                                          /* number of PDP steps since the last update that
342                                           * are assigned to the first CDP to be generated
343                                           * since the last update. */
344     unsigned short   scratch_idx;
345                                          /* index into the CDP scratch array */
346     enum cf_en       current_cf;
347                                          /* numeric id of the current consolidation function */
348     rpnstack_t       rpnstack; /* used for COMPUTE DS */
349     int              version;  /* rrd version */
350     char             *endptr; /* used in the conversion */
351 #ifdef HAVE_MMAP
352     void             *rrd_mmaped_file;
353     unsigned long    rrd_filesize;
354 #endif
355
356     rpnstack_init(&rpnstack);
357
358     /* need at least 1 arguments: data. */
359     if (argc < 1) {
360         rrd_set_error("Not enough arguments");
361         return -1;
362     }
363     
364     
365
366     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
367         return -1;
368     }
369     /* initialize time */
370     version = atoi(rrd.stat_head->version);
371     gettimeofday(&tmp_time, 0);
372     normalize_time(&tmp_time);
373     current_time = tmp_time.tv_sec;
374     if(version >= 3) {
375         current_time_usec = tmp_time.tv_usec;
376     }
377     else {
378         current_time_usec = 0;
379     }
380
381     rra_current = rra_start = rra_begin = ftell(rrd_file);
382     /* This is defined in the ANSI C standard, section 7.9.5.3:
383
384         When a file is opened with udpate mode ('+' as the second
385         or third character in the ... list of mode argument
386         variables), both input and ouptut may be performed on the
387         associated stream.  However, ...  input may not be directly
388         followed by output without an intervening call to a file
389         positioning function, unless the input oepration encounters
390         end-of-file. */
391 #ifdef HAVE_MMAP
392     fseek(rrd_file, 0, SEEK_END);
393     rrd_filesize = ftell(rrd_file);
394     fseek(rrd_file, rra_current, SEEK_SET);
395 #else
396     fseek(rrd_file, 0, SEEK_CUR);
397 #endif
398
399     
400     /* get exclusive lock to whole file.
401      * lock gets removed when we close the file.
402      */
403     if (LockRRD(rrd_file) != 0) {
404       rrd_set_error("could not lock RRD");
405       rrd_free(&rrd);
406       fclose(rrd_file);
407       return(-1);   
408     } 
409
410     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
411         rrd_set_error("allocating updvals pointer array");
412         rrd_free(&rrd);
413         fclose(rrd_file);
414         return(-1);
415     }
416
417     if ((pdp_temp = malloc(sizeof(rrd_value_t)
418                            *rrd.stat_head->ds_cnt))==NULL){
419         rrd_set_error("allocating pdp_temp ...");
420         free(updvals);
421         rrd_free(&rrd);
422         fclose(rrd_file);
423         return(-1);
424     }
425
426     if ((tmpl_idx = malloc(sizeof(unsigned long)
427                            *(rrd.stat_head->ds_cnt+1)))==NULL){
428         rrd_set_error("allocating tmpl_idx ...");
429         free(pdp_temp);
430         free(updvals);
431         rrd_free(&rrd);
432         fclose(rrd_file);
433         return(-1);
434     }
435     /* initialize template redirector */
436     /* default config example (assume DS 1 is a CDEF DS)
437        tmpl_idx[0] -> 0; (time)
438        tmpl_idx[1] -> 1; (DS 0)
439        tmpl_idx[2] -> 3; (DS 2)
440        tmpl_idx[3] -> 4; (DS 3) */
441     tmpl_idx[0] = 0; /* time */
442     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
443         {
444            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
445               tmpl_idx[ii++]=i;
446         }
447     tmpl_cnt= ii;
448
449     if (template) {
450         char *dsname;
451         unsigned int tmpl_len;
452         dsname = template;
453         tmpl_cnt = 1; /* the first entry is the time */
454         tmpl_len = strlen(template);
455         for(i=0;i<=tmpl_len ;i++) {
456             if (template[i] == ':' || template[i] == '\0') {
457                 template[i] = '\0';
458                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
459                     rrd_set_error("Template contains more DS definitions than RRD");
460                     free(updvals); free(pdp_temp);
461                     free(tmpl_idx); rrd_free(&rrd);
462                     fclose(rrd_file); return(-1);
463                 }
464                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
465                     rrd_set_error("unknown DS name '%s'",dsname);
466                     free(updvals); free(pdp_temp);
467                     free(tmpl_idx); rrd_free(&rrd);
468                     fclose(rrd_file); return(-1);
469                 } else {
470                   /* the first element is always the time */
471                   tmpl_idx[tmpl_cnt-1]++; 
472                   /* go to the next entry on the template */
473                   dsname = &template[i+1];
474                   /* fix the damage we did before */
475                   if (i<tmpl_len) {
476                      template[i]=':';
477                   } 
478
479                 }
480             }       
481         }
482     }
483     if ((pdp_new = malloc(sizeof(rrd_value_t)
484                           *rrd.stat_head->ds_cnt))==NULL){
485         rrd_set_error("allocating pdp_new ...");
486         free(updvals);
487         free(pdp_temp);
488         free(tmpl_idx);
489         rrd_free(&rrd);
490         fclose(rrd_file);
491         return(-1);
492     }
493
494 #ifdef HAVE_MMAP
495     rrd_mmaped_file = mmap(0, 
496                         rrd_filesize, 
497                         PROT_READ | PROT_WRITE, 
498                         MAP_SHARED, 
499                         fileno(rrd_file), 
500                         0);
501     if (rrd_mmaped_file == MAP_FAILED) {
502         rrd_set_error("error mmapping file %s", filename);
503         free(updvals);
504         free(pdp_temp);
505         free(tmpl_idx);
506         rrd_free(&rrd);
507         fclose(rrd_file);
508         return(-1);
509     }
510 #endif
511     /* loop through the arguments. */
512     for(arg_i=0; arg_i<argc;arg_i++) {
513         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
514         char *step_start = stepper;
515         char *p;
516         char *parsetime_error = NULL;
517         enum {atstyle, normal} timesyntax;
518         struct rrd_time_value ds_tv;
519         if (stepper == NULL){
520                 rrd_set_error("failed duplication argv entry");
521                 free(updvals);
522                 free(pdp_temp);  
523                 free(tmpl_idx);
524                 rrd_free(&rrd);
525 #ifdef HAVE_MMAP
526                 munmap(rrd_mmaped_file, rrd_filesize);
527 #endif
528                 fclose(rrd_file);
529                 return(-1);
530          }
531         /* initialize all ds input to unknown except the first one
532            which has always got to be set */
533         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
534         strcpy(stepper,argv[arg_i]);
535         updvals[0]=stepper;
536         /* separate all ds elements; first must be examined separately
537            due to alternate time syntax */
538         if ((p=strchr(stepper,'@'))!=NULL) {
539             timesyntax = atstyle;
540             *p = '\0';
541             stepper = p+1;
542         } else if ((p=strchr(stepper,':'))!=NULL) {
543             timesyntax = normal;
544             *p = '\0';
545             stepper = p+1;
546         } else {
547             rrd_set_error("expected timestamp not found in data source from %s:...",
548                           argv[arg_i]);
549             free(step_start);
550             break;
551         }
552         ii=1;
553         updvals[tmpl_idx[ii]] = stepper;
554         while (*stepper) {
555             if (*stepper == ':') {
556                 *stepper = '\0';
557                 ii++;
558                 if (ii<tmpl_cnt){                   
559                     updvals[tmpl_idx[ii]] = stepper+1;
560                 }
561             }
562             stepper++;
563         }
564
565         if (ii != tmpl_cnt-1) {
566             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
567                           tmpl_cnt-1, ii, argv[arg_i]);
568             free(step_start);
569             break;
570         }
571         
572         /* get the time from the reading ... handle N */
573         if (timesyntax == atstyle) {
574             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
575                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
576                 free(step_start);
577                 break;
578             }
579             if (ds_tv.type == RELATIVE_TO_END_TIME ||
580                 ds_tv.type == RELATIVE_TO_START_TIME) {
581                 rrd_set_error("specifying time relative to the 'start' "
582                               "or 'end' makes no sense here: %s",
583                               updvals[0]);
584                 free(step_start);
585                 break;
586             }
587
588             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
589             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
590             
591         } else if (strcmp(updvals[0],"N")==0){
592             gettimeofday(&tmp_time, 0);
593             normalize_time(&tmp_time);
594             current_time = tmp_time.tv_sec;
595             current_time_usec = tmp_time.tv_usec;
596         } else {
597             double tmp;
598             tmp = strtod(updvals[0], 0);
599             current_time = floor(tmp);
600             current_time_usec = (long)((tmp - current_time) * 1000000L);
601         }
602         /* dont do any correction for old version RRDs */
603         if(version < 3) 
604             current_time_usec = 0;
605         
606         if(current_time <= rrd.live_head->last_up){
607             rrd_set_error("illegal attempt to update using time %ld when "
608                           "last update time is %ld (minimum one second step)",
609                           current_time, rrd.live_head->last_up);
610             free(step_start);
611             break;
612         }
613         
614         
615         /* seek to the beginning of the rra's */
616         if (rra_current != rra_begin) {
617 #ifndef HAVE_MMAP
618             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
619                 rrd_set_error("seek error in rrd");
620                 free(step_start);
621                 break;
622             }
623 #endif
624             rra_current = rra_begin;
625         }
626         rra_start = rra_begin;
627
628         /* when was the current pdp started */
629         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
630         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
631
632         /* when did the last pdp_st occur */
633         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
634         occu_pdp_st = current_time - occu_pdp_age;
635         /* interval = current_time - rrd.live_head->last_up; */
636         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
637     
638         if (occu_pdp_st > proc_pdp_st){
639             /* OK we passed the pdp_st moment*/
640             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
641                                                               * occurred before the latest
642                                                               * pdp_st moment*/
643             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
644             post_int = occu_pdp_age;                         /* how much after it */
645             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
646         } else {
647             pre_int = interval;
648             post_int = 0;
649         }
650
651 #ifdef DEBUG
652         printf(
653                "proc_pdp_age %lu\t"
654                "proc_pdp_st %lu\t" 
655                "occu_pfp_age %lu\t" 
656                "occu_pdp_st %lu\t"
657                "int %lf\t"
658                "pre_int %lf\t"
659                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
660                 occu_pdp_age, occu_pdp_st,
661                interval, pre_int, post_int);
662 #endif
663     
664         /* process the data sources and update the pdp_prep 
665          * area accordingly */
666         for(i=0;i<rrd.stat_head->ds_cnt;i++){
667             enum dst_en dst_idx;
668             dst_idx= dst_conv(rrd.ds_def[i].dst);
669                 /* NOTE: DST_CDEF should never enter this if block, because
670                  * updvals[i+1][0] is initialized to 'U'; unless the caller
671                  * accidently specified a value for the DST_CDEF. To handle 
672                  * this case, an extra check is required. */
673             if((updvals[i+1][0] != 'U') &&
674                    (dst_idx != DST_CDEF) &&
675                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
676                double rate = DNAN;
677                /* the data source type defines how to process the data */
678                 /* pdp_new contains rate * time ... eg the bytes
679                  * transferred during the interval. Doing it this way saves
680                  * a lot of math operations */
681                 
682
683                 switch(dst_idx){
684                 case DST_COUNTER:
685                 case DST_DERIVE:
686                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
687                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
688                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
689                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
690                                  break;
691                             }
692                        }
693                        if (rrd_test_error()){
694                             break;
695                        }
696                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
697                        if(dst_idx == DST_COUNTER) {
698                           /* simple overflow catcher suggested by Andres Kroonmaa */
699                           /* this will fail terribly for non 32 or 64 bit counters ... */
700                           /* are there any others in SNMP land ? */
701                           if (pdp_new[i] < (double)0.0 ) 
702                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
703                           if (pdp_new[i] < (double)0.0 ) 
704                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
705                        }
706                        rate = pdp_new[i] / interval;
707                     }
708                    else {
709                      pdp_new[i]= DNAN;          
710                    }
711                    break;
712                 case DST_ABSOLUTE:
713                     errno = 0;
714                     pdp_new[i] = strtod(updvals[i+1],&endptr);
715                     if (errno > 0){
716                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
717                         break;
718                     };
719                     if (endptr[0] != '\0'){
720                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
721                         break;
722                     }
723                     rate = pdp_new[i] / interval;                 
724                     break;
725                 case DST_GAUGE:
726                     errno = 0;
727                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
728                     if (errno > 0){
729                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
730                         break;
731                     };
732                     if (endptr[0] != '\0'){
733                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
734                         break;
735                     }
736                     rate = pdp_new[i] / interval;                  
737                     break;
738                 default:
739                     rrd_set_error("rrd contains unknown DS type : '%s'",
740                                   rrd.ds_def[i].dst);
741                     break;
742                 }
743                 /* break out of this for loop if the error string is set */
744                 if (rrd_test_error()){
745                     break;
746                 }
747                /* make sure pdp_temp is neither too large or too small
748                 * if any of these occur it becomes unknown ...
749                 * sorry folks ... */
750                if ( ! isnan(rate) && 
751                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
752                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
753                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
754                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
755                   pdp_new[i] = DNAN;
756                }               
757             } else {
758                 /* no news is news all the same */
759                 pdp_new[i] = DNAN;
760             }
761             
762             /* make a copy of the command line argument for the next run */
763 #ifdef DEBUG
764             fprintf(stderr,
765                     "prep ds[%lu]\t"
766                     "last_arg '%s'\t"
767                     "this_arg '%s'\t"
768                     "pdp_new %10.2f\n",
769                     i,
770                     rrd.pdp_prep[i].last_ds,
771                     updvals[i+1], pdp_new[i]);
772 #endif
773             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
774                 strncpy(rrd.pdp_prep[i].last_ds,
775                         updvals[i+1],LAST_DS_LEN-1);
776                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
777             }
778         }
779         /* break out of the argument parsing loop if the error_string is set */
780         if (rrd_test_error()){
781             free(step_start);
782             break;
783         }
784         /* has a pdp_st moment occurred since the last run ? */
785
786         if (proc_pdp_st == occu_pdp_st){
787             /* no we have not passed a pdp_st moment. therefore update is simple */
788
789             for(i=0;i<rrd.stat_head->ds_cnt;i++){
790                 if(isnan(pdp_new[i]))
791                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
792                 else
793                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
794 #ifdef DEBUG
795                 fprintf(stderr,
796                         "NO PDP  ds[%lu]\t"
797                         "value %10.2f\t"
798                         "unkn_sec %5lu\n",
799                         i,
800                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
801                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
802 #endif
803             }   
804         } else {
805             /* an pdp_st has occurred. */
806
807             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
808              * occurred up to the last run.        
809             pdp_new[] contains rate*seconds from the latest run.
810             pdp_temp[] will contain the rate for cdp */
811
812             for(i=0;i<rrd.stat_head->ds_cnt;i++){
813                 /* update pdp_prep to the current pdp_st */
814                 if(isnan(pdp_new[i]))
815                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
816                 else
817                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
818                         pdp_new[i]/(double)interval*(double)pre_int;
819
820                 /* if too much of the pdp_prep is unknown we dump it */
821                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
822                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
823                     (occu_pdp_st-proc_pdp_st <= 
824                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
825                     pdp_temp[i] = DNAN;
826                 } else {
827                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
828                         / (double)( occu_pdp_st
829                                    - proc_pdp_st
830                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
831                 }
832
833                 /* process CDEF data sources; remember each CDEF DS can
834                  * only reference other DS with a lower index number */
835             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
836                    rpnp_t *rpnp;
837                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
838                    /* substitue data values for OP_VARIABLE nodes */
839                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
840                    {
841                           if (rpnp[ii].op == OP_VARIABLE) {
842                                  rpnp[ii].op = OP_NUMBER;
843                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
844                           }
845                    }
846                    /* run the rpn calculator */
847                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
848                           free(rpnp);
849                           break; /* exits the data sources pdp_temp loop */
850                    }
851                 }
852         
853                 /* make pdp_prep ready for the next run */
854                 if(isnan(pdp_new[i])){
855                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
856                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
857                 } else {
858                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
859                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
860                         pdp_new[i]/(double)interval*(double)post_int;
861                 }
862
863 #ifdef DEBUG
864                 fprintf(stderr,
865                         "PDP UPD ds[%lu]\t"
866                         "pdp_temp %10.2f\t"
867                         "new_prep %10.2f\t"
868                         "new_unkn_sec %5lu\n",
869                         i, pdp_temp[i],
870                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
871                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
872 #endif
873             }
874
875                 /* if there were errors during the last loop, bail out here */
876             if (rrd_test_error()){
877                free(step_start);
878                break;
879             }
880
881                 /* compute the number of elapsed pdp_st moments */
882                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
883 #ifdef DEBUG
884                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
885 #endif
886                 if (rra_step_cnt == NULL)
887                 {
888                    rra_step_cnt = (unsigned long *) 
889                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
890                 }
891
892             for(i = 0, rra_start = rra_begin;
893                 i < rrd.stat_head->rra_cnt;
894             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
895                 i++)
896                 {
897                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
898                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
899                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
900         if (start_pdp_offset <= elapsed_pdp_st) {
901            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
902                       rrd.rra_def[i].pdp_cnt + 1;
903             } else {
904                    rra_step_cnt[i] = 0;
905                 }
906
907                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
908                 {
909                    /* If this is a bulk update, we need to skip ahead in the seasonal
910                         * arrays so that they will be correct for the next observed value;
911                         * note that for the bulk update itself, no update will occur to
912                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
913                         * be set to DNAN. */
914            if (rra_step_cnt[i] > 2) 
915                    {
916                           /* skip update by resetting rra_step_cnt[i],
917                            * note that this is not data source specific; this is due
918                            * to the bulk update, not a DNAN value for the specific data
919                            * source. */
920                           rra_step_cnt[i] = 0;
921               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
922                              &last_seasonal_coef);
923                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
924                              &seasonal_coef);
925                    }
926                 
927                   /* periodically run a smoother for seasonal effects */
928                   /* Need to use first cdp parameter buffer to track
929                    * burnin (burnin requires a specific smoothing schedule).
930                    * The CDP_init_seasonal parameter is really an RRA level,
931                    * not a data source within RRA level parameter, but the rra_def
932                    * is read only for rrd_update (not flushed to disk). */
933                   iii = i*(rrd.stat_head -> ds_cnt);
934                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
935                           <= BURNIN_CYCLES)
936                   {
937                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
938                                  > rrd.rra_def[i].row_cnt - 1) {
939                            /* mark off one of the burnin cycles */
940                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
941                        schedule_smooth = 1;
942                          }  
943                   } else {
944                          /* someone has no doubt invented a trick to deal with this
945                           * wrap around, but at least this code is clear. */
946                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
947                              rrd.rra_ptr[i].cur_row)
948                          {
949                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
950                                   * mapping between PDP and CDP */
951                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
952                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
953                                  {
954 #ifdef DEBUG
955                                         fprintf(stderr,
956                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
957                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
958                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
959 #endif
960                                         schedule_smooth = 1;
961                                  }
962              } else {
963                                  /* can't rely on negative numbers because we are working with
964                                   * unsigned values */
965                                  /* Don't need modulus here. If we've wrapped more than once, only
966                                   * one smooth is executed at the end. */
967                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
968                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
969                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
970                                  {
971 #ifdef DEBUG
972                                         fprintf(stderr,
973                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
974                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
975                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
976 #endif
977                                         schedule_smooth = 1;
978                                  }
979                          }
980                   }
981
982               rra_current = ftell(rrd_file); 
983                 } /* if cf is DEVSEASONAL or SEASONAL */
984
985         if (rrd_test_error()) break;
986
987                     /* update CDP_PREP areas */
988                     /* loop over data soures within each RRA */
989                     for(ii = 0;
990                         ii < rrd.stat_head->ds_cnt;
991                         ii++)
992                         {
993                         
994                         /* iii indexes the CDP prep area for this data source within the RRA */
995                         iii=i*rrd.stat_head->ds_cnt+ii;
996
997                         if (rrd.rra_def[i].pdp_cnt > 1) {
998                           
999                            if (rra_step_cnt[i] > 0) {
1000                            /* If we are in this block, as least 1 CDP value will be written to
1001                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1002                                 * to be written, then the "fill in" value is the CDP_secondary_val
1003                                 * entry. */
1004                                   if (isnan(pdp_temp[ii]))
1005                   {
1006                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
1007                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1008                                   } else {
1009                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
1010                                           * CDP data entries. No matter the CF, the value is the same because
1011                                           * the average, max, min, and last of a list of identical values is
1012                                           * the same, namely, the value itself. */
1013                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
1014                                   }
1015                      
1016                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
1017                                       > rrd.rra_def[i].pdp_cnt*
1018                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
1019                                   {
1020                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1021                                          /* initialize carry over */
1022                                          if (current_cf == CF_AVERAGE) {
1023                                                    if (isnan(pdp_temp[ii])) { 
1024                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1025                                                    } else {
1026                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1027                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1028                                                    }
1029                                          } else {
1030                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1031                                          }
1032                                   } else {
1033                                          rrd_value_t cum_val, cur_val; 
1034                                      switch (current_cf) {
1035                                                 case CF_AVERAGE:
1036                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
1037                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
1038                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
1039                                                (cum_val + cur_val * start_pdp_offset) /
1040                                            (rrd.rra_def[i].pdp_cnt
1041                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1042                                                    /* initialize carry over value */
1043                                                    if (isnan(pdp_temp[ii])) { 
1044                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1045                                                    } else {
1046                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1047                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1048                                                    }
1049                                                    break;
1050                                                 case CF_MAXIMUM:
1051                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1052                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1053 #ifdef DEBUG
1054                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1055                                                           isnan(pdp_temp[ii])) {
1056                                                      fprintf(stderr,
1057                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1058                                                                 i,ii);
1059                                                          exit(-1);
1060                                                   }
1061 #endif
1062                                                   if (cur_val > cum_val)
1063                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1064                                                   else
1065                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1066                                                   /* initialize carry over value */
1067                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1068                                                   break;
1069                                                 case CF_MINIMUM:
1070                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1071                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1072 #ifdef DEBUG
1073                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1074                                                           isnan(pdp_temp[ii])) {
1075                                                      fprintf(stderr,
1076                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1077                                                                 i,ii);
1078                                                          exit(-1);
1079                                                   }
1080 #endif
1081                                                   if (cur_val < cum_val)
1082                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1083                                                   else
1084                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1085                                                   /* initialize carry over value */
1086                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1087                                                   break;
1088                                                 case CF_LAST:
1089                                                 default:
1090                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1091                                                    /* initialize carry over value */
1092                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1093                                                 break;
1094                                          }
1095                                   } /* endif meets xff value requirement for a valid value */
1096                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1097                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1098                                   if (isnan(pdp_temp[ii]))
1099                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1100                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1101                                   else
1102                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1103                } else  /* rra_step_cnt[i]  == 0 */
1104                            {
1105 #ifdef DEBUG
1106                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1107                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1108                                          i,ii);
1109                                   } else {
1110                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1111                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1112                                   }
1113 #endif
1114                                   if (isnan(pdp_temp[ii])) {
1115                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1116                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1117                                   {
1118                                          if (current_cf == CF_AVERAGE) {
1119                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1120                                                    elapsed_pdp_st;
1121                                          } else {
1122                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1123                                          }
1124 #ifdef DEBUG
1125                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1126                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1127 #endif
1128                                   } else {
1129                                          switch (current_cf) {
1130                                          case CF_AVERAGE:
1131                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1132                                                    elapsed_pdp_st;
1133                                                 break;
1134                                          case CF_MINIMUM:
1135                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1136                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1137                                                 break; 
1138                                          case CF_MAXIMUM:
1139                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1140                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1141                                                 break; 
1142                                          case CF_LAST:
1143                                          default:
1144                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1145                                                 break;
1146                                          }
1147                                   }
1148                            }
1149                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1150                            if (elapsed_pdp_st > 2)
1151                            {
1152                                    switch (current_cf) {
1153                                    case CF_AVERAGE:
1154                                    default:
1155                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1156                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1157                                           break;
1158                    case CF_SEASONAL:
1159                                    case CF_DEVSEASONAL:
1160                                           /* need to update cached seasonal values, so they are consistent
1161                                            * with the bulk update */
1162                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1163                                            * CDP_last_deviation are the same. */
1164                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1165                                                  last_seasonal_coef[ii];
1166                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1167                                                  seasonal_coef[ii];
1168                                           break;
1169                    case CF_HWPREDICT:
1170                                           /* need to update the null_count and last_null_count.
1171                                            * even do this for non-DNAN pdp_temp because the
1172                                            * algorithm is not learning from batch updates. */
1173                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1174                                                  elapsed_pdp_st;
1175                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1176                                                  elapsed_pdp_st - 1;
1177                                           /* fall through */
1178                                    case CF_DEVPREDICT:
1179                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1180                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1181                                           break;
1182                    case CF_FAILURES:
1183                                           /* do not count missed bulk values as failures */
1184                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1185                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1186                                           /* need to reset violations buffer.
1187                                            * could do this more carefully, but for now, just
1188                                            * assume a bulk update wipes away all violations. */
1189                       erase_violations(&rrd, iii, i);
1190                                           break;
1191                                    }
1192                            } 
1193                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1194
1195                         if (rrd_test_error()) break;
1196
1197                         } /* endif data sources loop */
1198         } /* end RRA Loop */
1199
1200                 /* this loop is only entered if elapsed_pdp_st < 3 */
1201                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1202                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1203                 {
1204                for(i = 0, rra_start = rra_begin;
1205                    i < rrd.stat_head->rra_cnt;
1206                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1207                    i++)
1208                    {
1209                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1210
1211                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1212                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1213                           {
1214                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1215                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1216                                 &seasonal_coef);
1217                  rra_current = ftell(rrd_file);
1218                           }
1219                           if (rrd_test_error()) break;
1220                       /* loop over data soures within each RRA */
1221                       for(ii = 0;
1222                           ii < rrd.stat_head->ds_cnt;
1223                           ii++)
1224                           {
1225                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1226                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1227                                     scratch_idx, seasonal_coef);
1228                           }
1229            } /* end RRA Loop */
1230                    if (rrd_test_error()) break;
1231             } /* end elapsed_pdp_st loop */
1232
1233                 if (rrd_test_error()) break;
1234
1235                 /* Ready to write to disk */
1236                 /* Move sequentially through the file, writing one RRA at a time.
1237                  * Note this architecture divorces the computation of CDP with
1238                  * flushing updated RRA entries to disk. */
1239             for(i = 0, rra_start = rra_begin;
1240                 i < rrd.stat_head->rra_cnt;
1241             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1242                 i++) {
1243                 /* is there anything to write for this RRA? If not, continue. */
1244         if (rra_step_cnt[i] == 0) continue;
1245
1246                 /* write the first row */
1247 #ifdef DEBUG
1248         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1249 #endif
1250             rrd.rra_ptr[i].cur_row++;
1251             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1252                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1253                 /* positition on the first row */
1254                 rra_pos_tmp = rra_start +
1255                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1256                 if(rra_pos_tmp != rra_current) {
1257 #ifndef HAVE_MMAP
1258                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1259                       rrd_set_error("seek error in rrd");
1260                       break;
1261                    }
1262 #endif
1263                    rra_current = rra_pos_tmp;
1264                 }
1265
1266 #ifdef DEBUG
1267             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1268 #endif
1269                 scratch_idx = CDP_primary_val;
1270                 if (pcdp_summary != NULL)
1271                 {
1272                    rra_time = (current_time - current_time 
1273                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1274                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1275                 }
1276 #ifdef HAVE_MMAP
1277                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1278                    pcdp_summary, &rra_time, rrd_mmaped_file);
1279 #else
1280                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1281                    pcdp_summary, &rra_time);
1282 #endif
1283                 if (rrd_test_error()) break;
1284
1285                 /* write other rows of the bulk update, if any */
1286                 scratch_idx = CDP_secondary_val;
1287                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1288                 {
1289                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1290                    {
1291 #ifdef DEBUG
1292               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1293                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1294 #endif
1295                           /* wrap */
1296                           rrd.rra_ptr[i].cur_row = 0;
1297                           /* seek back to beginning of current rra */
1298                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1299                           {
1300                          rrd_set_error("seek error in rrd");
1301                          break;
1302                           }
1303 #ifdef DEBUG
1304                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1305 #endif
1306                           rra_current = rra_start;
1307                    }
1308                    if (pcdp_summary != NULL)
1309                    {
1310                       rra_time = (current_time - current_time 
1311                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1312                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1313                    }
1314 #ifdef HAVE_MMAP
1315                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1316                       pcdp_summary, &rra_time, rrd_mmaped_file);
1317 #else
1318                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1319                       pcdp_summary, &rra_time);
1320 #endif
1321                 }
1322                 
1323                 if (rrd_test_error())
1324                   break;
1325                 } /* RRA LOOP */
1326
1327             /* break out of the argument parsing loop if error_string is set */
1328             if (rrd_test_error()){
1329                    free(step_start);
1330                    break;
1331             } 
1332             
1333         } /* endif a pdp_st has occurred */ 
1334         rrd.live_head->last_up = current_time;
1335         rrd.live_head->last_up_usec = current_time_usec; 
1336         free(step_start);
1337     } /* function argument loop */
1338
1339     if (seasonal_coef != NULL) free(seasonal_coef);
1340     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1341         if (rra_step_cnt != NULL) free(rra_step_cnt);
1342     rpnstack_free(&rpnstack);
1343
1344 #ifdef HAVE_MMAP
1345     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1346             rrd_set_error("error writing(unmapping) file: %s", filename);
1347     }
1348 #endif    
1349     /* if we got here and if there is an error and if the file has not been
1350      * written to, then close things up and return. */
1351     if (rrd_test_error()) {
1352         free(updvals);
1353         free(tmpl_idx);
1354         rrd_free(&rrd);
1355         free(pdp_temp);
1356         free(pdp_new);
1357         fclose(rrd_file);
1358         return(-1);
1359     }
1360
1361     /* aargh ... that was tough ... so many loops ... anyway, its done.
1362      * we just need to write back the live header portion now*/
1363
1364     if (fseek(rrd_file, (sizeof(stat_head_t)
1365                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1366                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1367               SEEK_SET) != 0) {
1368         rrd_set_error("seek rrd for live header writeback");
1369         free(updvals);
1370         free(tmpl_idx);
1371         rrd_free(&rrd);
1372         free(pdp_temp);
1373         free(pdp_new);
1374         fclose(rrd_file);
1375         return(-1);
1376     }
1377
1378     if(version >= 3) {
1379             if(fwrite( rrd.live_head,
1380                        sizeof(live_head_t), 1, rrd_file) != 1){
1381                 rrd_set_error("fwrite live_head to rrd");
1382                 free(updvals);
1383                 rrd_free(&rrd);
1384                 free(tmpl_idx);
1385                 free(pdp_temp);
1386                 free(pdp_new);
1387                 fclose(rrd_file);
1388                 return(-1);
1389             }
1390     }
1391     else {
1392             if(fwrite( &rrd.live_head->last_up,
1393                        sizeof(time_t), 1, rrd_file) != 1){
1394                 rrd_set_error("fwrite live_head to rrd");
1395                 free(updvals);
1396                 rrd_free(&rrd);
1397                 free(tmpl_idx);
1398                 free(pdp_temp);
1399                 free(pdp_new);
1400                 fclose(rrd_file);
1401                 return(-1);
1402             }
1403     }
1404             
1405
1406     if(fwrite( rrd.pdp_prep,
1407                sizeof(pdp_prep_t),
1408                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1409         rrd_set_error("ftwrite pdp_prep to rrd");
1410         free(updvals);
1411         rrd_free(&rrd);
1412         free(tmpl_idx);
1413         free(pdp_temp);
1414         free(pdp_new);
1415         fclose(rrd_file);
1416         return(-1);
1417     }
1418
1419     if(fwrite( rrd.cdp_prep,
1420                sizeof(cdp_prep_t),
1421                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1422        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1423
1424         rrd_set_error("ftwrite cdp_prep to rrd");
1425         free(updvals);
1426         free(tmpl_idx);
1427         rrd_free(&rrd);
1428         free(pdp_temp);
1429         free(pdp_new);
1430         fclose(rrd_file);
1431         return(-1);
1432     }
1433
1434     if(fwrite( rrd.rra_ptr,
1435                sizeof(rra_ptr_t), 
1436                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1437         rrd_set_error("fwrite rra_ptr to rrd");
1438         free(updvals);
1439         free(tmpl_idx);
1440         rrd_free(&rrd);
1441         free(pdp_temp);
1442         free(pdp_new);
1443         fclose(rrd_file);
1444         return(-1);
1445     }
1446
1447     /* OK now close the files and free the memory */
1448     if(fclose(rrd_file) != 0){
1449         rrd_set_error("closing rrd");
1450         free(updvals);
1451         free(tmpl_idx);
1452         rrd_free(&rrd);
1453         free(pdp_temp);
1454         free(pdp_new);
1455         return(-1);
1456     }
1457
1458     /* calling the smoothing code here guarantees at most
1459          * one smoothing operation per rrd_update call. Unfortunately,
1460          * it is possible with bulk updates, or a long-delayed update
1461          * for smoothing to occur off-schedule. This really isn't
1462          * critical except during the burning cycles. */
1463         if (schedule_smooth)
1464         {
1465 #ifndef WIN32
1466           rrd_file = fopen(filename,"r+");
1467 #else
1468           rrd_file = fopen(filename,"rb+");
1469 #endif
1470           rra_start = rra_begin;
1471           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1472           {
1473             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1474                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1475             {
1476 #ifdef DEBUG
1477               fprintf(stderr,"Running smoother for rra %ld\n",i);
1478 #endif
1479               apply_smoother(&rrd,i,rra_start,rrd_file);
1480               if (rrd_test_error())
1481                 break;
1482             }
1483             rra_start += rrd.rra_def[i].row_cnt
1484               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1485           }
1486           fclose(rrd_file);
1487         }
1488     rrd_free(&rrd);
1489     free(updvals);
1490     free(tmpl_idx);
1491     free(pdp_new);
1492     free(pdp_temp);
1493     return(0);
1494 }
1495
1496 /*
1497  * get exclusive lock to whole file.
1498  * lock gets removed when we close the file
1499  *
1500  * returns 0 on success
1501  */
1502 int
1503 LockRRD(FILE *rrdfile)
1504 {
1505     int rrd_fd;         /* File descriptor for RRD */
1506     int rcstat;
1507
1508     rrd_fd = fileno(rrdfile);
1509
1510         {
1511 #ifndef WIN32    
1512     struct flock        lock;
1513     lock.l_type = F_WRLCK;    /* exclusive write lock */
1514     lock.l_len = 0;           /* whole file */
1515     lock.l_start = 0;         /* start of file */
1516     lock.l_whence = SEEK_SET;   /* end of file */
1517
1518     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1519 #else
1520     struct _stat st;
1521
1522     if ( _fstat( rrd_fd, &st ) == 0 ) {
1523             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1524     } else {
1525             rcstat = -1;
1526     }
1527 #endif
1528         }
1529
1530     return(rcstat);
1531 }
1532
1533
1534 #ifdef HAVE_MMAP
1535 info_t
1536 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1537                unsigned short CDP_scratch_idx, FILE *rrd_file,
1538                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1539 #else
1540 info_t
1541 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1542                unsigned short CDP_scratch_idx, FILE *rrd_file,
1543                    info_t *pcdp_summary, time_t *rra_time)
1544 #endif
1545 {
1546    unsigned long ds_idx, cdp_idx;
1547    infoval iv;
1548   
1549    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1550    {
1551       /* compute the cdp index */
1552       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1553 #ifdef DEBUG
1554           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1555              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1556              rrd -> rra_def[rra_idx].cf_nam);
1557 #endif 
1558       if (pcdp_summary != NULL)
1559           {
1560              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1561              /* append info to the return hash */
1562                  pcdp_summary = info_push(pcdp_summary,
1563                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1564                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1565                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1566          RD_I_VAL, iv);
1567           }
1568 #ifdef HAVE_MMAP
1569           memcpy((char *)rrd_mmaped_file + *rra_current,
1570                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1571                           sizeof(rrd_value_t));
1572 #else
1573           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1574                  sizeof(rrd_value_t),1,rrd_file) != 1)
1575           { 
1576              rrd_set_error("writing rrd");
1577              return 0;
1578           }
1579 #endif
1580           *rra_current += sizeof(rrd_value_t);
1581         }
1582         return (pcdp_summary);
1583 }