Updates from Bernhard Fischer rep dot nop gmail com
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
16
17 #include "rrd_hw.h"
18 #include "rrd_rpncalc.h"
19
20 #include "rrd_is_thread_safe.h"
21 #include "unused.h"
22
23 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
24 /*
25  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
26  * replacement.
27  */
28 #include <sys/timeb.h>
29
30 #ifndef __MINGW32__
31 struct timeval {
32     time_t    tv_sec;   /* seconds */
33     long      tv_usec;  /* microseconds */
34 };
35 #endif
36
37 struct __timezone {
38     int       tz_minuteswest;   /* minutes W of Greenwich */
39     int       tz_dsttime;   /* type of dst correction */
40 };
41
42 static int gettimeofday(
43     struct timeval *t,
44     struct __timezone *tz)
45 {
46
47     struct _timeb current_time;
48
49     _ftime(&current_time);
50
51     t->tv_sec = current_time.time;
52     t->tv_usec = current_time.millitm * 1000;
53
54     return 0;
55 }
56
57 #endif
58 /*
59  * normilize time as returned by gettimeofday. usec part must
60  * be always >= 0
61  */
62 static void normalize_time(
63     struct timeval *t)
64 {
65     if (t->tv_usec < 0) {
66         t->tv_sec--;
67         t->tv_usec += 1000000L;
68     }
69 }
70
71 static info_t *write_RRA_row(
72     rrd_file_t *rrd_file,
73     rrd_t *rrd,
74     unsigned long rra_idx,
75     unsigned long *rra_current,
76     unsigned short CDP_scratch_idx,
77     info_t *pcdp_summary,
78     time_t *rra_time)
79 {
80     unsigned long ds_idx, cdp_idx;
81     infoval   iv;
82
83     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
84         /* compute the cdp index */
85         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
86 #ifdef DEBUG
87         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
88                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
89                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
90 #endif
91         if (pcdp_summary != NULL) {
92             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
93             /* append info to the return hash */
94             pcdp_summary = info_push(pcdp_summary,
95                                      sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
96                                                    *rra_time,
97                                                    rrd->rra_def[rra_idx].
98                                                    cf_nam,
99                                                    rrd->rra_def[rra_idx].
100                                                    pdp_cnt,
101                                                    rrd->ds_def[ds_idx].
102                                                    ds_nam), RD_I_VAL, iv);
103         }
104         if (rrd_write
105             (rrd_file,
106              &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
107              sizeof(rrd_value_t) * 1) != sizeof(rrd_value_t) * 1) {
108             rrd_set_error("writing rrd");
109             return 0;
110         }
111         *rra_current += sizeof(rrd_value_t);
112     }
113     return (pcdp_summary);
114 }
115
116 int       rrd_update_r(
117     const char *filename,
118     const char *tmplt,
119     int argc,
120     const char **argv);
121 int       _rrd_update(
122     const char *filename,
123     const char *tmplt,
124     int argc,
125     const char **argv,
126     info_t *);
127
128 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
129
130
131 info_t   *rrd_update_v(
132     int argc,
133     char **argv)
134 {
135     char     *tmplt = NULL;
136     info_t   *result = NULL;
137     infoval   rc;
138
139     rc.u_int = -1;
140     optind = 0;
141     opterr = 0;         /* initialize getopt */
142
143     while (1) {
144         static struct option long_options[] = {
145             {"template", required_argument, 0, 't'},
146             {0, 0, 0, 0}
147         };
148         int       option_index = 0;
149         int       opt;
150
151         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
152
153         if (opt == EOF)
154             break;
155
156         switch (opt) {
157         case 't':
158             tmplt = optarg;
159             break;
160
161         case '?':
162             rrd_set_error("unknown option '%s'", argv[optind - 1]);
163             goto end_tag;
164         }
165     }
166
167     /* need at least 2 arguments: filename, data. */
168     if (argc - optind < 2) {
169         rrd_set_error("Not enough arguments");
170         goto end_tag;
171     }
172     rc.u_int = 0;
173     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
174     rc.u_int = _rrd_update(argv[optind], tmplt,
175                            argc - optind - 1,
176                            (const char **) (argv + optind + 1), result);
177     result->value.u_int = rc.u_int;
178   end_tag:
179     return result;
180 }
181
182 int rrd_update(
183     int argc,
184     char **argv)
185 {
186     char     *tmplt = NULL;
187     int       rc;
188
189     optind = 0;
190     opterr = 0;         /* initialize getopt */
191
192     while (1) {
193         static struct option long_options[] = {
194             {"template", required_argument, 0, 't'},
195             {0, 0, 0, 0}
196         };
197         int       option_index = 0;
198         int       opt;
199
200         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
201
202         if (opt == EOF)
203             break;
204
205         switch (opt) {
206         case 't':
207             tmplt = optarg;
208             break;
209
210         case '?':
211             rrd_set_error("unknown option '%s'", argv[optind - 1]);
212             return (-1);
213         }
214     }
215
216     /* need at least 2 arguments: filename, data. */
217     if (argc - optind < 2) {
218         rrd_set_error("Not enough arguments");
219
220         return -1;
221     }
222
223     rc = rrd_update_r(argv[optind], tmplt,
224                       argc - optind - 1, (const char **) (argv + optind + 1));
225     return rc;
226 }
227
228 int rrd_update_r(
229     const char *filename,
230     const char *tmplt,
231     int argc,
232     const char **argv)
233 {
234     return _rrd_update(filename, tmplt, argc, argv, NULL);
235 }
236
237 int _rrd_update(
238     const char *filename,
239     const char *tmplt,
240     int argc,
241     const char **argv,
242     info_t *pcdp_summary)
243 {
244
245     int       arg_i = 2;
246     short     j;
247     unsigned long i, ii, iii = 1;
248
249     unsigned long rra_begin;    /* byte pointer to the rra
250                                  * area in the rrd file.  this
251                                  * pointer never changes value */
252     unsigned long rra_start;    /* byte pointer to the rra
253                                  * area in the rrd file.  this
254                                  * pointer changes as each rrd is
255                                  * processed. */
256     unsigned long rra_current;  /* byte pointer to the current write
257                                  * spot in the rrd file. */
258     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
259     double    interval, pre_int, post_int;  /* interval between this and
260                                              * the last run */
261     unsigned long proc_pdp_st;  /* which pdp_st was the last
262                                  * to be processed */
263     unsigned long occu_pdp_st;  /* when was the pdp_st
264                                  * before the last update
265                                  * time */
266     unsigned long proc_pdp_age; /* how old was the data in
267                                  * the pdp prep area when it
268                                  * was last updated */
269     unsigned long occu_pdp_age; /* how long ago was the last
270                                  * pdp_step time */
271     rrd_value_t *pdp_new;   /* prepare the incoming data
272                              * to be added the the
273                              * existing entry */
274     rrd_value_t *pdp_temp;  /* prepare the pdp values 
275                              * to be added the the
276                              * cdp values */
277
278     long     *tmpl_idx; /* index representing the settings
279                            transported by the tmplt index */
280     unsigned long tmpl_cnt = 2; /* time and data */
281
282     rrd_t     rrd;
283     time_t    current_time = 0;
284     time_t    rra_time = 0; /* time of update for a RRA */
285     unsigned long current_time_usec = 0;    /* microseconds part of current time */
286     struct timeval tmp_time;    /* used for time conversion */
287
288     char    **updvals;
289     int       schedule_smooth = 0;
290     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
291
292     /* a vector of future Holt-Winters seasonal coefs */
293     unsigned long elapsed_pdp_st;
294
295     /* number of elapsed PDP steps since last update */
296     unsigned long *rra_step_cnt = NULL;
297
298     /* number of rows to be updated in an RRA for a data
299      * value. */
300     unsigned long start_pdp_offset;
301
302     /* number of PDP steps since the last update that
303      * are assigned to the first CDP to be generated
304      * since the last update. */
305     unsigned short scratch_idx;
306
307     /* index into the CDP scratch array */
308     enum cf_en current_cf;
309
310     /* numeric id of the current consolidation function */
311     rpnstack_t rpnstack;    /* used for COMPUTE DS */
312     int       version;  /* rrd version */
313     char     *endptr;   /* used in the conversion */
314     rrd_file_t *rrd_file;
315
316     rpnstack_init(&rpnstack);
317
318     /* need at least 1 arguments: data. */
319     if (argc < 1) {
320         rrd_set_error("Not enough arguments");
321         return -1;
322     }
323
324     rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
325     if (rrd_file == NULL) {
326         return -1;
327     }
328
329     /* initialize time */
330     version = atoi(rrd.stat_head->version);
331     gettimeofday(&tmp_time, 0);
332     normalize_time(&tmp_time);
333     current_time = tmp_time.tv_sec;
334     if (version >= 3) {
335         current_time_usec = tmp_time.tv_usec;
336     } else {
337         current_time_usec = 0;
338     }
339
340     rra_current = rra_start = rra_begin = rrd_file->header_len;
341     /* This is defined in the ANSI C standard, section 7.9.5.3:
342
343        When a file is opened with udpate mode ('+' as the second
344        or third character in the ... list of mode argument
345        variables), both input and output may be performed on the
346        associated stream.  However, ...  input may not be directly
347        followed by output without an intervening call to a file
348        positioning function, unless the input operation encounters
349        end-of-file. */
350 #if 0                   //def HAVE_MMAP
351     rrd_filesize = rrd_file->file_size;
352     fseek(rrd_file->fd, 0, SEEK_END);
353     rrd_filesize = ftell(rrd_file->fd);
354     fseek(rrd_file->fd, rra_current, SEEK_SET);
355 #else
356 //    fseek(rrd_file->fd, 0, SEEK_CUR);
357 #endif
358
359
360     /* get exclusive lock to whole file.
361      * lock gets removed when we close the file.
362      */
363     if (LockRRD(rrd_file->fd) != 0) {
364         rrd_set_error("could not lock RRD");
365         rrd_free(&rrd);
366         close(rrd_file->fd);
367         return (-1);
368     }
369
370     if ((updvals =
371          malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
372         rrd_set_error("allocating updvals pointer array");
373         rrd_free(&rrd);
374         close(rrd_file->fd);
375         return (-1);
376     }
377
378     if ((pdp_temp = malloc(sizeof(rrd_value_t)
379                            * rrd.stat_head->ds_cnt)) == NULL) {
380         rrd_set_error("allocating pdp_temp ...");
381         free(updvals);
382         rrd_free(&rrd);
383         close(rrd_file->fd);
384         return (-1);
385     }
386
387     if ((tmpl_idx = malloc(sizeof(unsigned long)
388                            * (rrd.stat_head->ds_cnt + 1))) == NULL) {
389         rrd_set_error("allocating tmpl_idx ...");
390         free(pdp_temp);
391         free(updvals);
392         rrd_free(&rrd);
393         close(rrd_file->fd);
394         return (-1);
395     }
396     /* initialize tmplt redirector */
397     /* default config example (assume DS 1 is a CDEF DS)
398        tmpl_idx[0] -> 0; (time)
399        tmpl_idx[1] -> 1; (DS 0)
400        tmpl_idx[2] -> 3; (DS 2)
401        tmpl_idx[3] -> 4; (DS 3) */
402     tmpl_idx[0] = 0;    /* time */
403     for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
404         if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
405             tmpl_idx[ii++] = i;
406     }
407     tmpl_cnt = ii;
408
409     if (tmplt) {
410         /* we should work on a writeable copy here */
411         char     *dsname;
412         unsigned int tmpl_len;
413         char     *tmplt_copy = strdup(tmplt);
414
415         dsname = tmplt_copy;
416         tmpl_cnt = 1;   /* the first entry is the time */
417         tmpl_len = strlen(tmplt_copy);
418         for (i = 0; i <= tmpl_len; i++) {
419             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
420                 tmplt_copy[i] = '\0';
421                 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
422                     rrd_set_error
423                         ("tmplt contains more DS definitions than RRD");
424                     free(updvals);
425                     free(pdp_temp);
426                     free(tmpl_idx);
427                     rrd_free(&rrd);
428                     close(rrd_file->fd);
429                     return (-1);
430                 }
431                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
432                     rrd_set_error("unknown DS name '%s'", dsname);
433                     free(updvals);
434                     free(pdp_temp);
435                     free(tmplt_copy);
436                     free(tmpl_idx);
437                     rrd_free(&rrd);
438                     close(rrd_file->fd);
439                     return (-1);
440                 } else {
441                     /* the first element is always the time */
442                     tmpl_idx[tmpl_cnt - 1]++;
443                     /* go to the next entry on the tmplt_copy */
444                     dsname = &tmplt_copy[i + 1];
445                     /* fix the damage we did before */
446                     if (i < tmpl_len) {
447                         tmplt_copy[i] = ':';
448                     }
449
450                 }
451             }
452         }
453         free(tmplt_copy);
454     }
455     if ((pdp_new = malloc(sizeof(rrd_value_t)
456                           * rrd.stat_head->ds_cnt)) == NULL) {
457         rrd_set_error("allocating pdp_new ...");
458         free(updvals);
459         free(pdp_temp);
460         free(tmpl_idx);
461         rrd_free(&rrd);
462         close(rrd_file->fd);
463         return (-1);
464     }
465 #if 0                   //def HAVE_MMAP
466     rrd_mmaped_file = mmap(0,
467                            rrd_file->file_len,
468                            PROT_READ | PROT_WRITE,
469                            MAP_SHARED, fileno(in_file), 0);
470     if (rrd_mmaped_file == MAP_FAILED) {
471         rrd_set_error("error mmapping file %s", filename);
472         free(updvals);
473         free(pdp_temp);
474         free(tmpl_idx);
475         rrd_free(&rrd);
476         close(rrd_file->fd);
477         return (-1);
478     }
479 #ifdef USE_MADVISE
480     /* when we use mmaping we tell the kernel the mmap equivalent
481        of POSIX_FADV_RANDOM */
482     madvise(rrd_mmaped_file, rrd_filesize, POSIX_MADV_RANDOM);
483 #endif
484 #endif
485     /* loop through the arguments. */
486     for (arg_i = 0; arg_i < argc; arg_i++) {
487         char     *stepper = strdup(argv[arg_i]);
488         char     *step_start = stepper;
489         char     *p;
490         char     *parsetime_error = NULL;
491         enum { atstyle, normal } timesyntax;
492         struct rrd_time_value ds_tv;
493
494         if (stepper == NULL) {
495             rrd_set_error("failed duplication argv entry");
496             free(step_start);
497             free(updvals);
498             free(pdp_temp);
499             free(tmpl_idx);
500             rrd_free(&rrd);
501 #ifdef HAVE_MMAP
502             rrd_close(rrd_file);
503 #endif
504             close(rrd_file->fd);
505             return (-1);
506         }
507         /* initialize all ds input to unknown except the first one
508            which has always got to be set */
509         for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
510             updvals[ii] = "U";
511         updvals[0] = stepper;
512         /* separate all ds elements; first must be examined separately
513            due to alternate time syntax */
514         if ((p = strchr(stepper, '@')) != NULL) {
515             timesyntax = atstyle;
516             *p = '\0';
517             stepper = p + 1;
518         } else if ((p = strchr(stepper, ':')) != NULL) {
519             timesyntax = normal;
520             *p = '\0';
521             stepper = p + 1;
522         } else {
523             rrd_set_error
524                 ("expected timestamp not found in data source from %s",
525                  argv[arg_i]);
526             free(step_start);
527             break;
528         }
529         ii = 1;
530         updvals[tmpl_idx[ii]] = stepper;
531         while (*stepper) {
532             if (*stepper == ':') {
533                 *stepper = '\0';
534                 ii++;
535                 if (ii < tmpl_cnt) {
536                     updvals[tmpl_idx[ii]] = stepper + 1;
537                 }
538             }
539             stepper++;
540         }
541
542         if (ii != tmpl_cnt - 1) {
543             rrd_set_error
544                 ("expected %lu data source readings (got %lu) from %s",
545                  tmpl_cnt - 1, ii, argv[arg_i]);
546             free(step_start);
547             break;
548         }
549
550         /* get the time from the reading ... handle N */
551         if (timesyntax == atstyle) {
552             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
553                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
554                 free(step_start);
555                 break;
556             }
557             if (ds_tv.type == RELATIVE_TO_END_TIME ||
558                 ds_tv.type == RELATIVE_TO_START_TIME) {
559                 rrd_set_error("specifying time relative to the 'start' "
560                               "or 'end' makes no sense here: %s", updvals[0]);
561                 free(step_start);
562                 break;
563             }
564
565             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
566
567             current_time_usec = 0;  /* FIXME: how to handle usecs here ? */
568
569         } else if (strcmp(updvals[0], "N") == 0) {
570             gettimeofday(&tmp_time, 0);
571             normalize_time(&tmp_time);
572             current_time = tmp_time.tv_sec;
573             current_time_usec = tmp_time.tv_usec;
574         } else {
575             double    tmp;
576
577             tmp = strtod(updvals[0], 0);
578             current_time = floor(tmp);
579             current_time_usec =
580                 (long) ((tmp - (double) current_time) * 1000000.0);
581         }
582         /* dont do any correction for old version RRDs */
583         if (version < 3)
584             current_time_usec = 0;
585
586         if (current_time < rrd.live_head->last_up ||
587             (current_time == rrd.live_head->last_up &&
588              (long) current_time_usec <=
589              (long) rrd.live_head->last_up_usec)) {
590             rrd_set_error("illegal attempt to update using time %ld when "
591                           "last update time is %ld (minimum one second step)",
592                           current_time, rrd.live_head->last_up);
593             free(step_start);
594             break;
595         }
596
597
598         /* seek to the beginning of the rra's */
599         if (rra_current != rra_begin) {
600 #ifndef HAVE_MMAP
601             if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
602                 rrd_set_error("seek error in rrd");
603                 free(step_start);
604                 break;
605             }
606 #endif
607             rra_current = rra_begin;
608         }
609         rra_start = rra_begin;
610
611         /* when was the current pdp started */
612         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
613         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
614
615         /* when did the last pdp_st occur */
616         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
617         occu_pdp_st = current_time - occu_pdp_age;
618
619         /* interval = current_time - rrd.live_head->last_up; */
620         interval = (double) (current_time - rrd.live_head->last_up)
621             + (double) ((long) current_time_usec -
622                         (long) rrd.live_head->last_up_usec) / 1000000.0;
623
624         if (occu_pdp_st > proc_pdp_st) {
625             /* OK we passed the pdp_st moment */
626             pre_int = (long) occu_pdp_st - rrd.live_head->last_up;  /* how much of the input data
627                                                                      * occurred before the latest
628                                                                      * pdp_st moment*/
629             pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0;  /* adjust usecs */
630             post_int = occu_pdp_age;    /* how much after it */
631             post_int += ((double) current_time_usec) / 1000000.0;   /* adjust usecs */
632         } else {
633             pre_int = interval;
634             post_int = 0;
635         }
636
637 #ifdef DEBUG
638         printf("proc_pdp_age %lu\t"
639                "proc_pdp_st %lu\t"
640                "occu_pfp_age %lu\t"
641                "occu_pdp_st %lu\t"
642                "int %lf\t"
643                "pre_int %lf\t"
644                "post_int %lf\n", proc_pdp_age, proc_pdp_st,
645                occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
646 #endif
647
648         /* process the data sources and update the pdp_prep 
649          * area accordingly */
650         for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
651             enum dst_en dst_idx;
652
653             dst_idx = dst_conv(rrd.ds_def[i].dst);
654
655             /* make sure we do not build diffs with old last_ds values */
656             if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
657                 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
658                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
659             }
660
661             /* NOTE: DST_CDEF should never enter this if block, because
662              * updvals[i+1][0] is initialized to 'U'; unless the caller
663              * accidently specified a value for the DST_CDEF. To handle 
664              * this case, an extra check is required. */
665
666             if ((updvals[i + 1][0] != 'U') &&
667                 (dst_idx != DST_CDEF) &&
668                 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
669                 double    rate = DNAN;
670
671                 /* the data source type defines how to process the data */
672                 /* pdp_new contains rate * time ... eg the bytes
673                  * transferred during the interval. Doing it this way saves
674                  * a lot of math operations */
675
676
677                 switch (dst_idx) {
678                 case DST_COUNTER:
679                 case DST_DERIVE:
680                     if (rrd.pdp_prep[i].last_ds[0] != 'U') {
681                         for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
682                             if ((updvals[i + 1][ii] < '0'
683                                  || updvals[i + 1][ii] > '9') && (ii != 0
684                                                                   && updvals[i
685                                                                              +
686                                                                              1]
687                                                                   [ii] !=
688                                                                   '-')) {
689                                 rrd_set_error("not a simple integer: '%s'",
690                                               updvals[i + 1]);
691                                 break;
692                             }
693                         }
694                         if (rrd_test_error()) {
695                             break;
696                         }
697                         pdp_new[i] =
698                             rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
699                         if (dst_idx == DST_COUNTER) {
700                             /* simple overflow catcher suggested by Andres Kroonmaa */
701                             /* this will fail terribly for non 32 or 64 bit counters ... */
702                             /* are there any others in SNMP land ? */
703                             if (pdp_new[i] < (double) 0.0)
704                                 pdp_new[i] += (double) 4294967296.0;    /* 2^32 */
705                             if (pdp_new[i] < (double) 0.0)
706                                 pdp_new[i] += (double) 18446744069414584320.0;
707                             /* 2^64-2^32 */ ;
708                         }
709                         rate = pdp_new[i] / interval;
710                     } else {
711                         pdp_new[i] = DNAN;
712                     }
713                     break;
714                 case DST_ABSOLUTE:
715                     errno = 0;
716                     pdp_new[i] = strtod(updvals[i + 1], &endptr);
717                     if (errno > 0) {
718                         rrd_set_error("converting  '%s' to float: %s",
719                                       updvals[i + 1], rrd_strerror(errno));
720                         break;
721                     };
722                     if (endptr[0] != '\0') {
723                         rrd_set_error
724                             ("conversion of '%s' to float not complete: tail '%s'",
725                              updvals[i + 1], endptr);
726                         break;
727                     }
728                     rate = pdp_new[i] / interval;
729                     break;
730                 case DST_GAUGE:
731                     errno = 0;
732                     pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
733                     if (errno > 0) {
734                         rrd_set_error("converting  '%s' to float: %s",
735                                       updvals[i + 1], rrd_strerror(errno));
736                         break;
737                     };
738                     if (endptr[0] != '\0') {
739                         rrd_set_error
740                             ("conversion of '%s' to float not complete: tail '%s'",
741                              updvals[i + 1], endptr);
742                         break;
743                     }
744                     rate = pdp_new[i] / interval;
745                     break;
746                 default:
747                     rrd_set_error("rrd contains unknown DS type : '%s'",
748                                   rrd.ds_def[i].dst);
749                     break;
750                 }
751                 /* break out of this for loop if the error string is set */
752                 if (rrd_test_error()) {
753                     break;
754                 }
755                 /* make sure pdp_temp is neither too large or too small
756                  * if any of these occur it becomes unknown ...
757                  * sorry folks ... */
758                 if (!isnan(rate) &&
759                     ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
760                       rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
761                      (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
762                       rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
763                     pdp_new[i] = DNAN;
764                 }
765             } else {
766                 /* no news is news all the same */
767                 pdp_new[i] = DNAN;
768             }
769
770
771             /* make a copy of the command line argument for the next run */
772 #ifdef DEBUG
773             fprintf(stderr,
774                     "prep ds[%lu]\t"
775                     "last_arg '%s'\t"
776                     "this_arg '%s'\t"
777                     "pdp_new %10.2f\n",
778                     i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
779 #endif
780             strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
781             rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
782         }
783         /* break out of the argument parsing loop if the error_string is set */
784         if (rrd_test_error()) {
785             free(step_start);
786             break;
787         }
788         /* has a pdp_st moment occurred since the last run ? */
789
790         if (proc_pdp_st == occu_pdp_st) {
791             /* no we have not passed a pdp_st moment. therefore update is simple */
792
793             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
794                 if (isnan(pdp_new[i])) {
795                     /* this is not realy accurate if we use subsecond data arival time
796                        should have thought of it when going subsecond resolution ...
797                        sorry next format change we will have it! */
798                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
799                         floor(interval);
800                 } else {
801                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
802                         rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
803                     } else {
804                         rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
805                     }
806                 }
807 #ifdef DEBUG
808                 fprintf(stderr,
809                         "NO PDP  ds[%lu]\t"
810                         "value %10.2f\t"
811                         "unkn_sec %5lu\n",
812                         i,
813                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
814                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
815 #endif
816             }
817         } else {
818             /* an pdp_st has occurred. */
819
820             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
821              * occurred up to the last run.        
822              pdp_new[] contains rate*seconds from the latest run.
823              pdp_temp[] will contain the rate for cdp */
824
825             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
826                 /* update pdp_prep to the current pdp_st. */
827                 double    pre_unknown = 0.0;
828
829                 if (isnan(pdp_new[i]))
830                     /* a final bit of unkonwn to be added bevore calculation
831                      * we use a tempaorary variable for this so that we 
832                      * don't have to turn integer lines before using the value */
833                     pre_unknown = pre_int;
834                 else {
835                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
836                         rrd.pdp_prep[i].scratch[PDP_val].u_val =
837                             pdp_new[i] / interval * pre_int;
838                     } else {
839                         rrd.pdp_prep[i].scratch[PDP_val].u_val +=
840                             pdp_new[i] / interval * pre_int;
841                     }
842                 }
843
844
845                 /* if too much of the pdp_prep is unknown we dump it */
846                 if (
847                        /* removed because this does not agree with the definition
848                           a heart beat can be unknown */
849                        /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
850                           > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
851                        /* if the interval is larger thatn mrhb we get NAN */
852                        (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
853                        (occu_pdp_st - proc_pdp_st <=
854                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
855                     pdp_temp[i] = DNAN;
856                 } else {
857                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
858                         / ((double) (occu_pdp_st - proc_pdp_st
859                                      -
860                                      rrd.pdp_prep[i].
861                                      scratch[PDP_unkn_sec_cnt].u_cnt)
862                            - pre_unknown);
863                 }
864
865                 /* process CDEF data sources; remember each CDEF DS can
866                  * only reference other DS with a lower index number */
867                 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
868                     rpnp_t   *rpnp;
869
870                     rpnp =
871                         rpn_expand((rpn_cdefds_t *) &
872                                    (rrd.ds_def[i].par[DS_cdef]));
873                     /* substitue data values for OP_VARIABLE nodes */
874                     for (ii = 0; rpnp[ii].op != OP_END; ii++) {
875                         if (rpnp[ii].op == OP_VARIABLE) {
876                             rpnp[ii].op = OP_NUMBER;
877                             rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
878                         }
879                     }
880                     /* run the rpn calculator */
881                     if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
882                         free(rpnp);
883                         break;  /* exits the data sources pdp_temp loop */
884                     }
885                 }
886
887                 /* make pdp_prep ready for the next run */
888                 if (isnan(pdp_new[i])) {
889                     /* this is not realy accurate if we use subsecond data arival time
890                        should have thought of it when going subsecond resolution ...
891                        sorry next format change we will have it! */
892                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
893                         floor(post_int);
894                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
895                 } else {
896                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
897                     rrd.pdp_prep[i].scratch[PDP_val].u_val =
898                         pdp_new[i] / interval * post_int;
899                 }
900
901 #ifdef DEBUG
902                 fprintf(stderr,
903                         "PDP UPD ds[%lu]\t"
904                         "pdp_temp %10.2f\t"
905                         "new_prep %10.2f\t"
906                         "new_unkn_sec %5lu\n",
907                         i, pdp_temp[i],
908                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
909                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
910 #endif
911             }
912
913             /* if there were errors during the last loop, bail out here */
914             if (rrd_test_error()) {
915                 free(step_start);
916                 break;
917             }
918
919             /* compute the number of elapsed pdp_st moments */
920             elapsed_pdp_st =
921                 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
922 #ifdef DEBUG
923             fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
924 #endif
925             if (rra_step_cnt == NULL) {
926                 rra_step_cnt = (unsigned long *)
927                     malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
928             }
929
930             for (i = 0, rra_start = rra_begin;
931                  i < rrd.stat_head->rra_cnt;
932                  rra_start +=
933                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
934                  sizeof(rrd_value_t), i++) {
935                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
936                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
937                     (proc_pdp_st / rrd.stat_head->pdp_step) %
938                     rrd.rra_def[i].pdp_cnt;
939                 if (start_pdp_offset <= elapsed_pdp_st) {
940                     rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
941                         rrd.rra_def[i].pdp_cnt + 1;
942                 } else {
943                     rra_step_cnt[i] = 0;
944                 }
945
946                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
947                     /* If this is a bulk update, we need to skip ahead in the seasonal
948                      * arrays so that they will be correct for the next observed value;
949                      * note that for the bulk update itself, no update will occur to
950                      * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
951                      * be set to DNAN. */
952                     if (rra_step_cnt[i] > 2) {
953                         /* skip update by resetting rra_step_cnt[i],
954                          * note that this is not data source specific; this is due
955                          * to the bulk update, not a DNAN value for the specific data
956                          * source. */
957                         rra_step_cnt[i] = 0;
958                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
959                                         elapsed_pdp_st, &last_seasonal_coef);
960                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
961                                         elapsed_pdp_st + 1, &seasonal_coef);
962                     }
963
964                     /* periodically run a smoother for seasonal effects */
965                     /* Need to use first cdp parameter buffer to track
966                      * burnin (burnin requires a specific smoothing schedule).
967                      * The CDP_init_seasonal parameter is really an RRA level,
968                      * not a data source within RRA level parameter, but the rra_def
969                      * is read only for rrd_update (not flushed to disk). */
970                     iii = i * (rrd.stat_head->ds_cnt);
971                     if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
972                         <= BURNIN_CYCLES) {
973                         if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
974                             > rrd.rra_def[i].row_cnt - 1) {
975                             /* mark off one of the burnin cycles */
976                             ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
977                                u_cnt);
978                             schedule_smooth = 1;
979                         }
980                     } else {
981                         /* someone has no doubt invented a trick to deal with this
982                          * wrap around, but at least this code is clear. */
983                         if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
984                             u_cnt > rrd.rra_ptr[i].cur_row) {
985                             /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
986                              * mapping between PDP and CDP */
987                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
988                                 >=
989                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
990                                 u_cnt) {
991 #ifdef DEBUG
992                                 fprintf(stderr,
993                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
994                                         rrd.rra_ptr[i].cur_row,
995                                         elapsed_pdp_st,
996                                         rrd.rra_def[i].
997                                         par[RRA_seasonal_smooth_idx].u_cnt);
998 #endif
999                                 schedule_smooth = 1;
1000                             }
1001                         } else {
1002                             /* can't rely on negative numbers because we are working with
1003                              * unsigned values */
1004                             /* Don't need modulus here. If we've wrapped more than once, only
1005                              * one smooth is executed at the end. */
1006                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
1007                                 rrd.rra_def[i].row_cnt
1008                                 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
1009                                 rrd.rra_def[i].row_cnt >=
1010                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
1011                                 u_cnt) {
1012 #ifdef DEBUG
1013                                 fprintf(stderr,
1014                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1015                                         rrd.rra_ptr[i].cur_row,
1016                                         elapsed_pdp_st,
1017                                         rrd.rra_def[i].
1018                                         par[RRA_seasonal_smooth_idx].u_cnt);
1019 #endif
1020                                 schedule_smooth = 1;
1021                             }
1022                         }
1023                     }
1024
1025                     rra_current = rrd_tell(rrd_file);
1026                 }
1027                 /* if cf is DEVSEASONAL or SEASONAL */
1028                 if (rrd_test_error())
1029                     break;
1030
1031                 /* update CDP_PREP areas */
1032                 /* loop over data soures within each RRA */
1033                 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1034
1035                     /* iii indexes the CDP prep area for this data source within the RRA */
1036                     iii = i * rrd.stat_head->ds_cnt + ii;
1037
1038                     if (rrd.rra_def[i].pdp_cnt > 1) {
1039
1040                         if (rra_step_cnt[i] > 0) {
1041                             /* If we are in this block, as least 1 CDP value will be written to
1042                              * disk, this is the CDP_primary_val entry. If more than 1 value needs
1043                              * to be written, then the "fill in" value is the CDP_secondary_val
1044                              * entry. */
1045                             if (isnan(pdp_temp[ii])) {
1046                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1047                                     u_cnt += start_pdp_offset;
1048                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1049                                     u_val = DNAN;
1050                             } else {
1051                                 /* CDP_secondary value is the RRA "fill in" value for intermediary
1052                                  * CDP data entries. No matter the CF, the value is the same because
1053                                  * the average, max, min, and last of a list of identical values is
1054                                  * the same, namely, the value itself. */
1055                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1056                                     u_val = pdp_temp[ii];
1057                             }
1058
1059                             if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1060                                 u_cnt >
1061                                 rrd.rra_def[i].pdp_cnt *
1062                                 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
1063                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1064                                     u_val = DNAN;
1065                                 /* initialize carry over */
1066                                 if (current_cf == CF_AVERAGE) {
1067                                     if (isnan(pdp_temp[ii])) {
1068                                         rrd.cdp_prep[iii].scratch[CDP_val].
1069                                             u_val = DNAN;
1070                                     } else {
1071                                         rrd.cdp_prep[iii].scratch[CDP_val].
1072                                             u_val =
1073                                             pdp_temp[ii] *
1074                                             ((elapsed_pdp_st -
1075                                               start_pdp_offset) %
1076                                              rrd.rra_def[i].pdp_cnt);
1077                                     }
1078                                 } else {
1079                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1080                                         pdp_temp[ii];
1081                                 }
1082                             } else {
1083                                 rrd_value_t cum_val, cur_val;
1084
1085                                 switch (current_cf) {
1086                                 case CF_AVERAGE:
1087                                     cum_val =
1088                                         IFDNAN(rrd.cdp_prep[iii].
1089                                                scratch[CDP_val].u_val, 0.0);
1090                                     cur_val = IFDNAN(pdp_temp[ii], 0.0);
1091                                     rrd.cdp_prep[iii].
1092                                         scratch[CDP_primary_val].u_val =
1093                                         (cum_val +
1094                                          cur_val * start_pdp_offset) /
1095                                         (rrd.rra_def[i].pdp_cnt -
1096                                          rrd.cdp_prep[iii].
1097                                          scratch[CDP_unkn_pdp_cnt].u_cnt);
1098                                     /* initialize carry over value */
1099                                     if (isnan(pdp_temp[ii])) {
1100                                         rrd.cdp_prep[iii].scratch[CDP_val].
1101                                             u_val = DNAN;
1102                                     } else {
1103                                         rrd.cdp_prep[iii].scratch[CDP_val].
1104                                             u_val =
1105                                             pdp_temp[ii] *
1106                                             ((elapsed_pdp_st -
1107                                               start_pdp_offset) %
1108                                              rrd.rra_def[i].pdp_cnt);
1109                                     }
1110                                     break;
1111                                 case CF_MAXIMUM:
1112                                     cum_val =
1113                                         IFDNAN(rrd.cdp_prep[iii].
1114                                                scratch[CDP_val].u_val, -DINF);
1115                                     cur_val = IFDNAN(pdp_temp[ii], -DINF);
1116 #ifdef DEBUG
1117                                     if (isnan
1118                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1119                                          u_val) && isnan(pdp_temp[ii])) {
1120                                         fprintf(stderr,
1121                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1122                                                 i, ii);
1123                                         exit(-1);
1124                                     }
1125 #endif
1126                                     if (cur_val > cum_val)
1127                                         rrd.cdp_prep[iii].
1128                                             scratch[CDP_primary_val].u_val =
1129                                             cur_val;
1130                                     else
1131                                         rrd.cdp_prep[iii].
1132                                             scratch[CDP_primary_val].u_val =
1133                                             cum_val;
1134                                     /* initialize carry over value */
1135                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1136                                         pdp_temp[ii];
1137                                     break;
1138                                 case CF_MINIMUM:
1139                                     cum_val =
1140                                         IFDNAN(rrd.cdp_prep[iii].
1141                                                scratch[CDP_val].u_val, DINF);
1142                                     cur_val = IFDNAN(pdp_temp[ii], DINF);
1143 #ifdef DEBUG
1144                                     if (isnan
1145                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1146                                          u_val) && isnan(pdp_temp[ii])) {
1147                                         fprintf(stderr,
1148                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1149                                                 i, ii);
1150                                         exit(-1);
1151                                     }
1152 #endif
1153                                     if (cur_val < cum_val)
1154                                         rrd.cdp_prep[iii].
1155                                             scratch[CDP_primary_val].u_val =
1156                                             cur_val;
1157                                     else
1158                                         rrd.cdp_prep[iii].
1159                                             scratch[CDP_primary_val].u_val =
1160                                             cum_val;
1161                                     /* initialize carry over value */
1162                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1163                                         pdp_temp[ii];
1164                                     break;
1165                                 case CF_LAST:
1166                                 default:
1167                                     rrd.cdp_prep[iii].
1168                                         scratch[CDP_primary_val].u_val =
1169                                         pdp_temp[ii];
1170                                     /* initialize carry over value */
1171                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1172                                         pdp_temp[ii];
1173                                     break;
1174                                 }
1175                             }   /* endif meets xff value requirement for a valid value */
1176                             /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1177                              * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1178                             if (isnan(pdp_temp[ii]))
1179                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1180                                     u_cnt =
1181                                     (elapsed_pdp_st -
1182                                      start_pdp_offset) %
1183                                     rrd.rra_def[i].pdp_cnt;
1184                             else
1185                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1186                                     u_cnt = 0;
1187                         } else {    /* rra_step_cnt[i]  == 0 */
1188
1189 #ifdef DEBUG
1190                             if (isnan
1191                                 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1192                                 fprintf(stderr,
1193                                         "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1194                                         i, ii);
1195                             } else {
1196                                 fprintf(stderr,
1197                                         "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1198                                         i, ii,
1199                                         rrd.cdp_prep[iii].scratch[CDP_val].
1200                                         u_val);
1201                             }
1202 #endif
1203                             if (isnan(pdp_temp[ii])) {
1204                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1205                                     u_cnt += elapsed_pdp_st;
1206                             } else
1207                                 if (isnan
1208                                     (rrd.cdp_prep[iii].scratch[CDP_val].
1209                                      u_val)) {
1210                                 if (current_cf == CF_AVERAGE) {
1211                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1212                                         pdp_temp[ii] * elapsed_pdp_st;
1213                                 } else {
1214                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1215                                         pdp_temp[ii];
1216                                 }
1217 #ifdef DEBUG
1218                                 fprintf(stderr,
1219                                         "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1220                                         i, ii,
1221                                         rrd.cdp_prep[iii].scratch[CDP_val].
1222                                         u_val);
1223 #endif
1224                             } else {
1225                                 switch (current_cf) {
1226                                 case CF_AVERAGE:
1227                                     rrd.cdp_prep[iii].scratch[CDP_val].
1228                                         u_val +=
1229                                         pdp_temp[ii] * elapsed_pdp_st;
1230                                     break;
1231                                 case CF_MINIMUM:
1232                                     if (pdp_temp[ii] <
1233                                         rrd.cdp_prep[iii].scratch[CDP_val].
1234                                         u_val)
1235                                         rrd.cdp_prep[iii].scratch[CDP_val].
1236                                             u_val = pdp_temp[ii];
1237                                     break;
1238                                 case CF_MAXIMUM:
1239                                     if (pdp_temp[ii] >
1240                                         rrd.cdp_prep[iii].scratch[CDP_val].
1241                                         u_val)
1242                                         rrd.cdp_prep[iii].scratch[CDP_val].
1243                                             u_val = pdp_temp[ii];
1244                                     break;
1245                                 case CF_LAST:
1246                                 default:
1247                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1248                                         pdp_temp[ii];
1249                                     break;
1250                                 }
1251                             }
1252                         }
1253                     } else {    /* rrd.rra_def[i].pdp_cnt == 1 */
1254                         if (elapsed_pdp_st > 2) {
1255                             switch (current_cf) {
1256                             case CF_AVERAGE:
1257                             default:
1258                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1259                                     u_val = pdp_temp[ii];
1260                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1261                                     u_val = pdp_temp[ii];
1262                                 break;
1263                             case CF_SEASONAL:
1264                             case CF_DEVSEASONAL:
1265                                 /* need to update cached seasonal values, so they are consistent
1266                                  * with the bulk update */
1267                                 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1268                                  * CDP_last_deviation are the same. */
1269                                 rrd.cdp_prep[iii].
1270                                     scratch[CDP_hw_last_seasonal].u_val =
1271                                     last_seasonal_coef[ii];
1272                                 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1273                                     u_val = seasonal_coef[ii];
1274                                 break;
1275                             case CF_HWPREDICT:
1276                                 /* need to update the null_count and last_null_count.
1277                                  * even do this for non-DNAN pdp_temp because the
1278                                  * algorithm is not learning from batch updates. */
1279                                 rrd.cdp_prep[iii].scratch[CDP_null_count].
1280                                     u_cnt += elapsed_pdp_st;
1281                                 rrd.cdp_prep[iii].
1282                                     scratch[CDP_last_null_count].u_cnt +=
1283                                     elapsed_pdp_st - 1;
1284                                 /* fall through */
1285                             case CF_DEVPREDICT:
1286                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1287                                     u_val = DNAN;
1288                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1289                                     u_val = DNAN;
1290                                 break;
1291                             case CF_FAILURES:
1292                                 /* do not count missed bulk values as failures */
1293                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1294                                     u_val = 0;
1295                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1296                                     u_val = 0;
1297                                 /* need to reset violations buffer.
1298                                  * could do this more carefully, but for now, just
1299                                  * assume a bulk update wipes away all violations. */
1300                                 erase_violations(&rrd, iii, i);
1301                                 break;
1302                             }
1303                         }
1304                     }   /* endif rrd.rra_def[i].pdp_cnt == 1 */
1305
1306                     if (rrd_test_error())
1307                         break;
1308
1309                 }       /* endif data sources loop */
1310             }           /* end RRA Loop */
1311
1312             /* this loop is only entered if elapsed_pdp_st < 3 */
1313             for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1314                  j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1315                 for (i = 0, rra_start = rra_begin;
1316                      i < rrd.stat_head->rra_cnt;
1317                      rra_start +=
1318                      rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1319                      sizeof(rrd_value_t), i++) {
1320                     if (rrd.rra_def[i].pdp_cnt > 1)
1321                         continue;
1322
1323                     current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1324                     if (current_cf == CF_SEASONAL
1325                         || current_cf == CF_DEVSEASONAL) {
1326                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
1327                                         elapsed_pdp_st + (scratch_idx ==
1328                                                           CDP_primary_val ? 1
1329                                                           : 2),
1330                                         &seasonal_coef);
1331                         rra_current = rrd_tell(rrd_file);
1332                     }
1333                     if (rrd_test_error())
1334                         break;
1335                     /* loop over data soures within each RRA */
1336                     for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1337                         update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1338                                            i * (rrd.stat_head->ds_cnt) + ii,
1339                                            i, ii, scratch_idx, seasonal_coef);
1340                     }
1341                 }       /* end RRA Loop */
1342                 if (rrd_test_error())
1343                     break;
1344             }           /* end elapsed_pdp_st loop */
1345
1346             if (rrd_test_error())
1347                 break;
1348
1349             /* Ready to write to disk */
1350             /* Move sequentially through the file, writing one RRA at a time.
1351              * Note this architecture divorces the computation of CDP with
1352              * flushing updated RRA entries to disk. */
1353             for (i = 0, rra_start = rra_begin;
1354                  i < rrd.stat_head->rra_cnt;
1355                  rra_start +=
1356                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1357                  sizeof(rrd_value_t), i++) {
1358                 /* is th5Aere anything to write for this RRA? If not, continue. */
1359                 if (rra_step_cnt[i] == 0)
1360                     continue;
1361
1362                 /* write the first row */
1363 #ifdef DEBUG
1364                 fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1365 #endif
1366                 rrd.rra_ptr[i].cur_row++;
1367                 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1368                     rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1369                 /* positition on the first row */
1370                 rra_pos_tmp = rra_start +
1371                     (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1372                     sizeof(rrd_value_t);
1373                 if (rra_pos_tmp != rra_current) {
1374 #ifndef HAVE_MMAP
1375                     if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1376                         rrd_set_error("seek error in rrd");
1377                         break;
1378                     }
1379 #endif
1380                     rra_current = rra_pos_tmp;
1381                 }
1382 #ifdef DEBUG
1383                 fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1384 #endif
1385                 scratch_idx = CDP_primary_val;
1386                 if (pcdp_summary != NULL) {
1387                     rra_time = (current_time - current_time
1388                                 % (rrd.rra_def[i].pdp_cnt *
1389                                    rrd.stat_head->pdp_step))
1390                         -
1391                         ((rra_step_cnt[i] -
1392                           1) * rrd.rra_def[i].pdp_cnt *
1393                          rrd.stat_head->pdp_step);
1394                 }
1395                 pcdp_summary =
1396                     write_RRA_row(rrd_file, &rrd, i, &rra_current,
1397                                   scratch_idx, pcdp_summary, &rra_time);
1398                 if (rrd_test_error())
1399                     break;
1400
1401                 /* write other rows of the bulk update, if any */
1402                 scratch_idx = CDP_secondary_val;
1403                 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1404                     if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1405 #ifdef DEBUG
1406                         fprintf(stderr,
1407                                 "Wraparound for RRA %s, %lu updates left\n",
1408                                 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1409 #endif
1410                         /* wrap */
1411                         rrd.rra_ptr[i].cur_row = 0;
1412                         /* seek back to beginning of current rra */
1413                         if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1414                             rrd_set_error("seek error in rrd");
1415                             break;
1416                         }
1417 #ifdef DEBUG
1418                         fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1419                                 rrd_file->pos);
1420 #endif
1421                         rra_current = rra_start;
1422                     }
1423                     if (pcdp_summary != NULL) {
1424                         rra_time = (current_time - current_time
1425                                     % (rrd.rra_def[i].pdp_cnt *
1426                                        rrd.stat_head->pdp_step))
1427                             -
1428                             ((rra_step_cnt[i] -
1429                               2) * rrd.rra_def[i].pdp_cnt *
1430                              rrd.stat_head->pdp_step);
1431                     }
1432                     pcdp_summary =
1433                         write_RRA_row(rrd_file, &rrd, i, &rra_current,
1434                                       scratch_idx, pcdp_summary, &rra_time);
1435                 }
1436
1437                 if (rrd_test_error())
1438                     break;
1439             }           /* RRA LOOP */
1440
1441             /* break out of the argument parsing loop if error_string is set */
1442             if (rrd_test_error()) {
1443                 free(step_start);
1444                 break;
1445             }
1446
1447         }               /* endif a pdp_st has occurred */
1448         rrd.live_head->last_up = current_time;
1449         rrd.live_head->last_up_usec = current_time_usec;
1450         free(step_start);
1451     }                   /* function argument loop */
1452
1453     if (seasonal_coef != NULL)
1454         free(seasonal_coef);
1455     if (last_seasonal_coef != NULL)
1456         free(last_seasonal_coef);
1457     if (rra_step_cnt != NULL)
1458         free(rra_step_cnt);
1459     rpnstack_free(&rpnstack);
1460
1461 #ifdef HAVE_MMAP
1462     if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1463         rrd_set_error("error writing(unmapping) file: %s", filename);
1464     }
1465 #endif
1466     /* if we got here and if there is an error and if the file has not been
1467      * written to, then close things up and return. */
1468     if (rrd_test_error()) {
1469         free(updvals);
1470         free(tmpl_idx);
1471         rrd_free(&rrd);
1472         free(pdp_temp);
1473         free(pdp_new);
1474         close(rrd_file->fd);
1475         return (-1);
1476     }
1477
1478     /* aargh ... that was tough ... so many loops ... anyway, its done.
1479      * we just need to write back the live header portion now*/
1480
1481     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1482                             + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1483                             + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1484                  SEEK_SET) != 0) {
1485         rrd_set_error("seek rrd for live header writeback");
1486         free(updvals);
1487         free(tmpl_idx);
1488         rrd_free(&rrd);
1489         free(pdp_temp);
1490         free(pdp_new);
1491         close(rrd_file->fd);
1492         return (-1);
1493     }
1494
1495     if (version >= 3) {
1496         if (rrd_write(rrd_file, rrd.live_head,
1497                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1498             rrd_set_error("rrd_write live_head to rrd");
1499             free(updvals);
1500             rrd_free(&rrd);
1501             free(tmpl_idx);
1502             free(pdp_temp);
1503             free(pdp_new);
1504             close(rrd_file->fd);
1505             return (-1);
1506         }
1507     } else {
1508         if (rrd_write(rrd_file, &rrd.live_head->last_up,
1509                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1510             rrd_set_error("rrd_write live_head to rrd");
1511             free(updvals);
1512             rrd_free(&rrd);
1513             free(tmpl_idx);
1514             free(pdp_temp);
1515             free(pdp_new);
1516             close(rrd_file->fd);
1517             return (-1);
1518         }
1519     }
1520
1521
1522     if (rrd_write(rrd_file, rrd.pdp_prep,
1523                   sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1524         != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1525         rrd_set_error("rrd_write pdp_prep to rrd");
1526         free(updvals);
1527         rrd_free(&rrd);
1528         free(tmpl_idx);
1529         free(pdp_temp);
1530         free(pdp_new);
1531         close(rrd_file->fd);
1532         return (-1);
1533     }
1534
1535     if (rrd_write(rrd_file, rrd.cdp_prep,
1536                   sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1537                   rrd.stat_head->ds_cnt)
1538         != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1539                       rrd.stat_head->ds_cnt)) {
1540
1541         rrd_set_error("rrd_write cdp_prep to rrd");
1542         free(updvals);
1543         free(tmpl_idx);
1544         rrd_free(&rrd);
1545         free(pdp_temp);
1546         free(pdp_new);
1547         close(rrd_file->fd);
1548         return (-1);
1549     }
1550
1551     if (rrd_write(rrd_file, rrd.rra_ptr,
1552                   sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1553         != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1554         rrd_set_error("rrd_write rra_ptr to rrd");
1555         free(updvals);
1556         free(tmpl_idx);
1557         rrd_free(&rrd);
1558         free(pdp_temp);
1559         free(pdp_new);
1560         close(rrd_file->fd);
1561         return (-1);
1562     }
1563 #ifdef HAVE_POSIX_FADVISExxx
1564
1565     /* with update we have write ops, so they will probably not be done by now, this means
1566        the buffers will not get freed. But calling this for the whole file - header
1567        will let the data off the hook as soon as it is written when if it is from a previous
1568        update cycle. Calling fdsync to force things is much too hard here. */
1569
1570     if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1571         rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1572                       rrd_strerror(errno));
1573         close(rrd_file->fd);
1574         return (-1);
1575     }
1576 #endif
1577     /*XXX: ? */ rrd_flush(rrd_file);
1578
1579     /* calling the smoothing code here guarantees at most
1580      * one smoothing operation per rrd_update call. Unfortunately,
1581      * it is possible with bulk updates, or a long-delayed update
1582      * for smoothing to occur off-schedule. This really isn't
1583      * critical except during the burning cycles. */
1584     if (schedule_smooth) {
1585 //    in_file = fopen(filename,"rb+");
1586
1587
1588         rra_start = rra_begin;
1589         for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1590             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1591                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1592 #ifdef DEBUG
1593                 fprintf(stderr, "Running smoother for rra %ld\n", i);
1594 #endif
1595                 apply_smoother(&rrd, i, rra_start, rrd_file);
1596                 if (rrd_test_error())
1597                     break;
1598             }
1599             rra_start += rrd.rra_def[i].row_cnt
1600                 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1601         }
1602 #ifdef HAVE_POSIX_FADVISExxx
1603         /* same procedure as above ... */
1604         if (0 !=
1605             posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1606             rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1607                           rrd_strerror(errno));
1608             close(rrd_file->fd);
1609             return (-1);
1610         }
1611 #endif
1612         close(rrd_file->fd);
1613     }
1614
1615     /* OK now close the files and free the memory */
1616     if (close(rrd_file->fd) != 0) {
1617         rrd_set_error("closing rrd");
1618         free(updvals);
1619         free(tmpl_idx);
1620         rrd_free(&rrd);
1621         free(pdp_temp);
1622         free(pdp_new);
1623         return (-1);
1624     }
1625
1626     rrd_free(&rrd);
1627     free(updvals);
1628     free(tmpl_idx);
1629     free(pdp_new);
1630     free(pdp_temp);
1631     return (0);
1632 }
1633
1634 /*
1635  * get exclusive lock to whole file.
1636  * lock gets removed when we close the file
1637  *
1638  * returns 0 on success
1639  */
1640 int LockRRD(
1641     int in_file)
1642 {
1643     int       rcstat;
1644
1645     {
1646 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1647         struct _stat st;
1648
1649         if (_fstat(in_file, &st) == 0) {
1650             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1651         } else {
1652             rcstat = -1;
1653         }
1654 #else
1655         struct flock lock;
1656
1657         lock.l_type = F_WRLCK;  /* exclusive write lock */
1658         lock.l_len = 0; /* whole file */
1659         lock.l_start = 0;   /* start of file */
1660         lock.l_whence = SEEK_SET;   /* end of file */
1661
1662         rcstat = fcntl(in_file, F_SETLK, &lock);
1663 #endif
1664     }
1665
1666     return (rcstat);
1667 }