make sure we check input even when the previous update was a 'U' ... and some indenti...
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
16
17 #include <locale.h>
18
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
21
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
24
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
28  * replacement.
29  */
30 #include <sys/timeb.h>
31
32 #ifndef __MINGW32__
33 struct timeval {
34     time_t    tv_sec;   /* seconds */
35     long      tv_usec;  /* microseconds */
36 };
37 #endif
38
39 struct __timezone {
40     int       tz_minuteswest;   /* minutes W of Greenwich */
41     int       tz_dsttime;   /* type of dst correction */
42 };
43
44 static int gettimeofday(
45     struct timeval *t,
46     struct __timezone *tz)
47 {
48
49     struct _timeb current_time;
50
51     _ftime(&current_time);
52
53     t->tv_sec = current_time.time;
54     t->tv_usec = current_time.millitm * 1000;
55
56     return 0;
57 }
58
59 #endif
60 /*
61  * normalize time as returned by gettimeofday. usec part must
62  * be always >= 0
63  */
64 static inline void normalize_time(
65     struct timeval *t)
66 {
67     if (t->tv_usec < 0) {
68         t->tv_sec--;
69         t->tv_usec += 1000000L;
70     }
71 }
72
73 static inline info_t *write_RRA_row(
74     rrd_file_t *rrd_file,
75     rrd_t *rrd,
76     unsigned long rra_idx,
77     unsigned long *rra_current,
78     unsigned short CDP_scratch_idx,
79     info_t *pcdp_summary,
80     time_t *rra_time)
81 {
82     unsigned long ds_idx, cdp_idx;
83     infoval   iv;
84
85     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
86         /* compute the cdp index */
87         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
88 #ifdef DEBUG
89         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
90                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
91                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
92 #endif
93         if (pcdp_summary != NULL) {
94             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
95             /* append info to the return hash */
96             pcdp_summary = info_push(pcdp_summary,
97                                      sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
98                                                    *rra_time,
99                                                    rrd->rra_def[rra_idx].
100                                                    cf_nam,
101                                                    rrd->rra_def[rra_idx].
102                                                    pdp_cnt,
103                                                    rrd->ds_def[ds_idx].
104                                                    ds_nam), RD_I_VAL, iv);
105         }
106         if (rrd_write
107             (rrd_file,
108              &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
109              sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
110             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
111             return 0;
112         }
113         *rra_current += sizeof(rrd_value_t);
114     }
115     return (pcdp_summary);
116 }
117
118 int       rrd_update_r(
119     const char *filename,
120     const char *tmplt,
121     int argc,
122     const char **argv);
123 int       _rrd_update(
124     const char *filename,
125     const char *tmplt,
126     int argc,
127     const char **argv,
128     info_t *);
129
130 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
131
132
133 info_t   *rrd_update_v(
134     int argc,
135     char **argv)
136 {
137     char     *tmplt = NULL;
138     info_t   *result = NULL;
139     infoval   rc;
140     struct option long_options[] = {
141         {"template", required_argument, 0, 't'},
142         {0, 0, 0, 0}
143     };
144
145     rc.u_int = -1;
146     optind = 0;
147     opterr = 0;         /* initialize getopt */
148
149     while (1) {
150         int       option_index = 0;
151         int       opt;
152
153         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
154
155         if (opt == EOF)
156             break;
157
158         switch (opt) {
159         case 't':
160             tmplt = optarg;
161             break;
162
163         case '?':
164             rrd_set_error("unknown option '%s'", argv[optind - 1]);
165             goto end_tag;
166         }
167     }
168
169     /* need at least 2 arguments: filename, data. */
170     if (argc - optind < 2) {
171         rrd_set_error("Not enough arguments");
172         goto end_tag;
173     }
174     rc.u_int = 0;
175     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
176     rc.u_int = _rrd_update(argv[optind], tmplt,
177                            argc - optind - 1,
178                            (const char **) (argv + optind + 1), result);
179     result->value.u_int = rc.u_int;
180   end_tag:
181     return result;
182 }
183
184 int rrd_update(
185     int argc,
186     char **argv)
187 {
188     struct option long_options[] = {
189         {"template", required_argument, 0, 't'},
190         {0, 0, 0, 0}
191     };
192     int       option_index = 0;
193     int       opt;
194     char     *tmplt = NULL;
195     int       rc;
196
197     optind = 0;
198     opterr = 0;         /* initialize getopt */
199
200     while (1) {
201         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
202
203         if (opt == EOF)
204             break;
205
206         switch (opt) {
207         case 't':
208             tmplt = strdup(optarg);
209             break;
210
211         case '?':
212             rrd_set_error("unknown option '%s'", argv[optind - 1]);
213             return (-1);
214         }
215     }
216
217     /* need at least 2 arguments: filename, data. */
218     if (argc - optind < 2) {
219         rrd_set_error("Not enough arguments");
220
221         return -1;
222     }
223
224     rc = rrd_update_r(argv[optind], tmplt,
225                       argc - optind - 1, (const char **) (argv + optind + 1));
226     free(tmplt);
227     return rc;
228 }
229
230 int rrd_update_r(
231     const char *filename,
232     const char *tmplt,
233     int argc,
234     const char **argv)
235 {
236     return _rrd_update(filename, tmplt, argc, argv, NULL);
237 }
238
239 int _rrd_update(
240     const char *filename,
241     const char *tmplt,
242     int argc,
243     const char **argv,
244     info_t *pcdp_summary)
245 {
246
247     int       arg_i = 2;
248     short     j;
249     unsigned long i, ii, iii = 1;
250
251     unsigned long rra_begin;    /* byte pointer to the rra
252                                  * area in the rrd file.  this
253                                  * pointer never changes value */
254     unsigned long rra_start;    /* byte pointer to the rra
255                                  * area in the rrd file.  this
256                                  * pointer changes as each rrd is
257                                  * processed. */
258     unsigned long rra_current;  /* byte pointer to the current write
259                                  * spot in the rrd file. */
260     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
261     double    interval, pre_int, post_int;  /* interval between this and
262                                              * the last run */
263     unsigned long proc_pdp_st;  /* which pdp_st was the last
264                                  * to be processed */
265     unsigned long occu_pdp_st;  /* when was the pdp_st
266                                  * before the last update
267                                  * time */
268     unsigned long proc_pdp_age; /* how old was the data in
269                                  * the pdp prep area when it
270                                  * was last updated */
271     unsigned long occu_pdp_age; /* how long ago was the last
272                                  * pdp_step time */
273     rrd_value_t *pdp_new;   /* prepare the incoming data
274                              * to be added the the
275                              * existing entry */
276     rrd_value_t *pdp_temp;  /* prepare the pdp values
277                              * to be added the the
278                              * cdp values */
279
280     long     *tmpl_idx; /* index representing the settings
281                            transported by the tmplt index */
282     unsigned long tmpl_cnt = 2; /* time and data */
283
284     rrd_t     rrd;
285     time_t    current_time = 0;
286     time_t    rra_time = 0; /* time of update for a RRA */
287     unsigned long current_time_usec = 0;    /* microseconds part of current time */
288     struct timeval tmp_time;    /* used for time conversion */
289
290     char    **updvals;
291     int       schedule_smooth = 0;
292     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
293
294     /* a vector of future Holt-Winters seasonal coefs */
295     unsigned long elapsed_pdp_st;
296
297     /* number of elapsed PDP steps since last update */
298     unsigned long *rra_step_cnt = NULL;
299
300     /* number of rows to be updated in an RRA for a data
301      * value. */
302     unsigned long start_pdp_offset;
303
304     /* number of PDP steps since the last update that
305      * are assigned to the first CDP to be generated
306      * since the last update. */
307     unsigned short scratch_idx;
308
309     /* index into the CDP scratch array */
310     enum cf_en current_cf;
311
312     /* numeric id of the current consolidation function */
313     rpnstack_t rpnstack;    /* used for COMPUTE DS */
314     int       version;  /* rrd version */
315     char     *endptr;   /* used in the conversion */
316     rrd_file_t *rrd_file;
317
318     rpnstack_init(&rpnstack);
319
320     /* need at least 1 arguments: data. */
321     if (argc < 1) {
322         rrd_set_error("Not enough arguments");
323         goto err_out;
324     }
325
326     rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
327     if (rrd_file == NULL) {
328         goto err_free;
329     }
330     /* We are now at the beginning of the rra's */
331     rra_current = rra_start = rra_begin = rrd_file->header_len;
332
333     /* initialize time */
334     version = atoi(rrd.stat_head->version);
335     gettimeofday(&tmp_time, 0);
336     normalize_time(&tmp_time);
337     current_time = tmp_time.tv_sec;
338     if (version >= 3) {
339         current_time_usec = tmp_time.tv_usec;
340     } else {
341         current_time_usec = 0;
342     }
343
344     /* get exclusive lock to whole file.
345      * lock gets removed when we close the file.
346      */
347     if (LockRRD(rrd_file->fd) != 0) {
348         rrd_set_error("could not lock RRD");
349         goto err_close;
350     }
351
352     if ((updvals =
353          malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
354         rrd_set_error("allocating updvals pointer array");
355         goto err_close;
356     }
357
358     if ((pdp_temp = malloc(sizeof(rrd_value_t)
359                            * rrd.stat_head->ds_cnt)) == NULL) {
360         rrd_set_error("allocating pdp_temp ...");
361         goto err_free_updvals;
362     }
363
364     if ((tmpl_idx = malloc(sizeof(unsigned long)
365                            * (rrd.stat_head->ds_cnt + 1))) == NULL) {
366         rrd_set_error("allocating tmpl_idx ...");
367         goto err_free_pdp_temp;
368     }
369     /* initialize tmplt redirector */
370     /* default config example (assume DS 1 is a CDEF DS)
371        tmpl_idx[0] -> 0; (time)
372        tmpl_idx[1] -> 1; (DS 0)
373        tmpl_idx[2] -> 3; (DS 2)
374        tmpl_idx[3] -> 4; (DS 3) */
375     tmpl_idx[0] = 0;    /* time */
376     for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
377         if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
378             tmpl_idx[ii++] = i;
379     }
380     tmpl_cnt = ii;
381
382     if (tmplt) {
383         /* we should work on a writeable copy here */
384         char     *dsname;
385         unsigned int tmpl_len;
386         char     *tmplt_copy = strdup(tmplt);
387
388         dsname = tmplt_copy;
389         tmpl_cnt = 1;   /* the first entry is the time */
390         tmpl_len = strlen(tmplt_copy);
391         for (i = 0; i <= tmpl_len; i++) {
392             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
393                 tmplt_copy[i] = '\0';
394                 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
395                     rrd_set_error
396                         ("tmplt contains more DS definitions than RRD");
397                     goto err_free_tmpl_idx;
398                 }
399                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
400                     rrd_set_error("unknown DS name '%s'", dsname);
401                     goto err_free_tmpl_idx;
402                 } else {
403                     /* the first element is always the time */
404                     tmpl_idx[tmpl_cnt - 1]++;
405                     /* go to the next entry on the tmplt_copy */
406                     dsname = &tmplt_copy[i + 1];
407                     /* fix the damage we did before */
408                     if (i < tmpl_len) {
409                         tmplt_copy[i] = ':';
410                     }
411
412                 }
413             }
414         }
415         free(tmplt_copy);
416     }
417     if ((pdp_new = malloc(sizeof(rrd_value_t)
418                           * rrd.stat_head->ds_cnt)) == NULL) {
419         rrd_set_error("allocating pdp_new ...");
420         goto err_free_tmpl_idx;
421     }
422     /* loop through the arguments. */
423     for (arg_i = 0; arg_i < argc; arg_i++) {
424         char     *stepper = strdup(argv[arg_i]);
425         char     *step_start = stepper;
426         char     *p;
427         char     *parsetime_error = NULL;
428         enum { atstyle, normal } timesyntax;
429         struct rrd_time_value ds_tv;
430
431         if (stepper == NULL) {
432             rrd_set_error("failed duplication argv entry");
433             free(step_start);
434             goto err_free_pdp_new;
435         }
436         /* initialize all ds input to unknown except the first one
437            which has always got to be set */
438         for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
439             updvals[ii] = "U";
440         updvals[0] = stepper;
441         /* separate all ds elements; first must be examined separately
442            due to alternate time syntax */
443         if ((p = strchr(stepper, '@')) != NULL) {
444             timesyntax = atstyle;
445             *p = '\0';
446             stepper = p + 1;
447         } else if ((p = strchr(stepper, ':')) != NULL) {
448             timesyntax = normal;
449             *p = '\0';
450             stepper = p + 1;
451         } else {
452             rrd_set_error
453                 ("expected timestamp not found in data source from %s",
454                  argv[arg_i]);
455             free(step_start);
456             break;
457         }
458         ii = 1;
459         updvals[tmpl_idx[ii]] = stepper;
460         while (*stepper) {
461             if (*stepper == ':') {
462                 *stepper = '\0';
463                 ii++;
464                 if (ii < tmpl_cnt) {
465                     updvals[tmpl_idx[ii]] = stepper + 1;
466                 }
467             }
468             stepper++;
469         }
470
471         if (ii != tmpl_cnt - 1) {
472             rrd_set_error
473                 ("expected %lu data source readings (got %lu) from %s",
474                  tmpl_cnt - 1, ii, argv[arg_i]);
475             free(step_start);
476             break;
477         }
478
479         /* get the time from the reading ... handle N */
480         if (timesyntax == atstyle) {
481             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
482                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
483                 free(step_start);
484                 break;
485             }
486             if (ds_tv.type == RELATIVE_TO_END_TIME ||
487                 ds_tv.type == RELATIVE_TO_START_TIME) {
488                 rrd_set_error("specifying time relative to the 'start' "
489                               "or 'end' makes no sense here: %s", updvals[0]);
490                 free(step_start);
491                 break;
492             }
493
494             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
495
496             current_time_usec = 0;  /* FIXME: how to handle usecs here ? */
497
498         } else if (strcmp(updvals[0], "N") == 0) {
499             gettimeofday(&tmp_time, 0);
500             normalize_time(&tmp_time);
501             current_time = tmp_time.tv_sec;
502             current_time_usec = tmp_time.tv_usec;
503         } else {
504             double    tmp;
505             char     *old_locale;
506
507             old_locale = setlocale(LC_NUMERIC, "C");
508             tmp = strtod(updvals[0], 0);
509             setlocale(LC_NUMERIC, old_locale);
510             current_time = floor(tmp);
511             current_time_usec =
512                 (long) ((tmp - (double) current_time) * 1000000.0);
513         }
514         /* dont do any correction for old version RRDs */
515         if (version < 3)
516             current_time_usec = 0;
517
518         if (current_time < rrd.live_head->last_up ||
519             (current_time == rrd.live_head->last_up &&
520              (long) current_time_usec <=
521              (long) rrd.live_head->last_up_usec)) {
522             rrd_set_error("illegal attempt to update using time %ld when "
523                           "last update time is %ld (minimum one second step)",
524                           current_time, rrd.live_head->last_up);
525             free(step_start);
526             break;
527         }
528
529         /* seek to the beginning of the rra's */
530         if (rra_current != rra_begin) {
531 #ifndef HAVE_MMAP
532             if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
533                 rrd_set_error("seek error in rrd");
534                 free(step_start);
535                 break;
536             }
537 #endif
538             rra_current = rra_begin;
539         }
540         rra_start = rra_begin;
541
542         /* when was the current pdp started */
543         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
544         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
545
546         /* when did the last pdp_st occur */
547         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
548         occu_pdp_st = current_time - occu_pdp_age;
549
550         /* interval = current_time - rrd.live_head->last_up; */
551         interval = (double) (current_time - rrd.live_head->last_up)
552             + (double) ((long) current_time_usec -
553                         (long) rrd.live_head->last_up_usec) / 1000000.0;
554
555         if (occu_pdp_st > proc_pdp_st) {
556             /* OK we passed the pdp_st moment */
557             pre_int = (long) occu_pdp_st - rrd.live_head->last_up;  /* how much of the input data
558                                                                      * occurred before the latest
559                                                                      * pdp_st moment*/
560             pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0;  /* adjust usecs */
561             post_int = occu_pdp_age;    /* how much after it */
562             post_int += ((double) current_time_usec) / 1000000.0;   /* adjust usecs */
563         } else {
564             pre_int = interval;
565             post_int = 0;
566         }
567
568 #ifdef DEBUG
569         printf("proc_pdp_age %lu\t"
570                "proc_pdp_st %lu\t"
571                "occu_pfp_age %lu\t"
572                "occu_pdp_st %lu\t"
573                "int %lf\t"
574                "pre_int %lf\t"
575                "post_int %lf\n", proc_pdp_age, proc_pdp_st,
576                occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
577 #endif
578
579         /* process the data sources and update the pdp_prep 
580          * area accordingly */
581         for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
582             enum dst_en dst_idx;
583
584             dst_idx = dst_conv(rrd.ds_def[i].dst);
585
586             /* make sure we do not build diffs with old last_ds values */
587             if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
588                 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
589                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
590             }
591
592             /* NOTE: DST_CDEF should never enter this if block, because
593              * updvals[i+1][0] is initialized to 'U'; unless the caller
594              * accidently specified a value for the DST_CDEF. To handle
595              * this case, an extra check is required. */
596
597             if ((updvals[i + 1][0] != 'U') &&
598                 (dst_idx != DST_CDEF) &&
599                 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
600                 double    rate = DNAN;
601                 char     *old_locale;
602
603                 /* the data source type defines how to process the data */
604                 /* pdp_new contains rate * time ... eg the bytes
605                  * transferred during the interval. Doing it this way saves
606                  * a lot of math operations */
607                 switch (dst_idx) {
608                 case DST_COUNTER:
609                 case DST_DERIVE:
610                     for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
611                         if ((updvals[i + 1][ii] < '0'
612                              || updvals[i + 1][ii] > '9') && (ii != 0
613                                                               && updvals[i
614                                                                          + 1]
615                                                               [ii] != '-')) {
616                             rrd_set_error("not a simple integer: '%s'",
617                                           updvals[i + 1]);
618                             break;
619                         }
620                     }
621                     if (rrd_test_error()) {
622                         break;
623                     }
624                     if (rrd.pdp_prep[i].last_ds[0] != 'U') {
625                         pdp_new[i] =
626                             rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
627                         if (dst_idx == DST_COUNTER) {
628                             /* simple overflow catcher suggested by Andres Kroonmaa */
629                             /* this will fail terribly for non 32 or 64 bit counters ... */
630                             /* are there any others in SNMP land ? */
631                             if (pdp_new[i] < (double) 0.0)
632                                 pdp_new[i] += (double) 4294967296.0;    /* 2^32 */
633                             if (pdp_new[i] < (double) 0.0)
634                                 pdp_new[i] += (double) 18446744069414584320.0;
635                             /* 2^64-2^32 */ ;
636                         }
637                         rate = pdp_new[i] / interval;
638                     } else {
639                         pdp_new[i] = DNAN;
640                     }
641                     break;
642                 case DST_ABSOLUTE:
643                     old_locale = setlocale(LC_NUMERIC, "C");
644                     errno = 0;
645                     pdp_new[i] = strtod(updvals[i + 1], &endptr);
646                     setlocale(LC_NUMERIC, old_locale);
647                     if (errno > 0) {
648                         rrd_set_error("converting '%s' to float: %s",
649                                       updvals[i + 1], rrd_strerror(errno));
650                         break;
651                     };
652                     if (endptr[0] != '\0') {
653                         rrd_set_error
654                             ("conversion of '%s' to float not complete: tail '%s'",
655                              updvals[i + 1], endptr);
656                         break;
657                     }
658                     rate = pdp_new[i] / interval;
659                     break;
660                 case DST_GAUGE:
661                     errno = 0;
662                     old_locale = setlocale(LC_NUMERIC, "C");
663                     pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
664                     setlocale(LC_NUMERIC, old_locale);
665                     if (errno > 0) {
666                         rrd_set_error("converting '%s' to float: %s",
667                                       updvals[i + 1], rrd_strerror(errno));
668                         break;
669                     };
670                     if (endptr[0] != '\0') {
671                         rrd_set_error
672                             ("conversion of '%s' to float not complete: tail '%s'",
673                              updvals[i + 1], endptr);
674                         break;
675                     }
676                     rate = pdp_new[i] / interval;
677                     break;
678                 default:
679                     rrd_set_error("rrd contains unknown DS type : '%s'",
680                                   rrd.ds_def[i].dst);
681                     break;
682                 }
683                 /* break out of this for loop if the error string is set */
684                 if (rrd_test_error()) {
685                     break;
686                 }
687                 /* make sure pdp_temp is neither too large or too small
688                  * if any of these occur it becomes unknown ...
689                  * sorry folks ... */
690                 if (!isnan(rate) &&
691                     ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
692                       rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
693                      (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
694                       rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
695                     pdp_new[i] = DNAN;
696                 }
697             } else {
698                 /* no news is news all the same */
699                 pdp_new[i] = DNAN;
700             }
701
702
703             /* make a copy of the command line argument for the next run */
704 #ifdef DEBUG
705             fprintf(stderr,
706                     "prep ds[%lu]\t"
707                     "last_arg '%s'\t"
708                     "this_arg '%s'\t"
709                     "pdp_new %10.2f\n",
710                     i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
711 #endif
712             strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
713             rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
714         }
715         /* break out of the argument parsing loop if the error_string is set */
716         if (rrd_test_error()) {
717             free(step_start);
718             break;
719         }
720         /* has a pdp_st moment occurred since the last run ? */
721
722         if (proc_pdp_st == occu_pdp_st) {
723             /* no we have not passed a pdp_st moment. therefore update is simple */
724
725             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
726                 if (isnan(pdp_new[i])) {
727                     /* this is not realy accurate if we use subsecond data arival time
728                        should have thought of it when going subsecond resolution ...
729                        sorry next format change we will have it! */
730                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
731                         floor(interval);
732                 } else {
733                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
734                         rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
735                     } else {
736                         rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
737                     }
738                 }
739 #ifdef DEBUG
740                 fprintf(stderr,
741                         "NO PDP  ds[%lu]\t"
742                         "value %10.2f\t"
743                         "unkn_sec %5lu\n",
744                         i,
745                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
746                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
747 #endif
748             }
749         } else {
750             /* an pdp_st has occurred. */
751
752             /* in pdp_prep[].scratch[PDP_val].u_val we have collected
753                rate*seconds which occurred up to the last run.
754                pdp_new[] contains rate*seconds from the latest run.
755                pdp_temp[] will contain the rate for cdp */
756
757             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
758                 /* update pdp_prep to the current pdp_st. */
759                 double    pre_unknown = 0.0;
760
761                 if (isnan(pdp_new[i])) {
762                     /* a final bit of unkonwn to be added bevore calculation
763                        we use a temporary variable for this so that we
764                        don't have to turn integer lines before using the value */
765                     pre_unknown = pre_int;
766                 } else {
767                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
768                         rrd.pdp_prep[i].scratch[PDP_val].u_val =
769                             pdp_new[i] / interval * pre_int;
770                     } else {
771                         rrd.pdp_prep[i].scratch[PDP_val].u_val +=
772                             pdp_new[i] / interval * pre_int;
773                     }
774                 }
775
776
777                 /* if too much of the pdp_prep is unknown we dump it */
778                 if (
779                        /* removed because this does not agree with the
780                           definition that a heartbeat can be unknown */
781                        /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
782                           > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
783                        /* if the interval is larger thatn mrhb we get NAN */
784                        (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
785                        (occu_pdp_st - proc_pdp_st <=
786                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
787                     pdp_temp[i] = DNAN;
788                 } else {
789                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
790                         / ((double) (occu_pdp_st - proc_pdp_st
791                                      -
792                                      rrd.pdp_prep[i].
793                                      scratch[PDP_unkn_sec_cnt].u_cnt)
794                            - pre_unknown);
795                 }
796
797                 /* process CDEF data sources; remember each CDEF DS can
798                  * only reference other DS with a lower index number */
799                 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
800                     rpnp_t   *rpnp;
801
802                     rpnp =
803                         rpn_expand((rpn_cdefds_t *) &
804                                    (rrd.ds_def[i].par[DS_cdef]));
805                     /* substitue data values for OP_VARIABLE nodes */
806                     for (ii = 0; rpnp[ii].op != OP_END; ii++) {
807                         if (rpnp[ii].op == OP_VARIABLE) {
808                             rpnp[ii].op = OP_NUMBER;
809                             rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
810                         }
811                     }
812                     /* run the rpn calculator */
813                     if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
814                         free(rpnp);
815                         break;  /* exits the data sources pdp_temp loop */
816                     }
817                 }
818
819                 /* make pdp_prep ready for the next run */
820                 if (isnan(pdp_new[i])) {
821                     /* this is not realy accurate if we use subsecond data arival time
822                        should have thought of it when going subsecond resolution ...
823                        sorry next format change we will have it! */
824                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
825                         floor(post_int);
826                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
827                 } else {
828                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
829                     rrd.pdp_prep[i].scratch[PDP_val].u_val =
830                         pdp_new[i] / interval * post_int;
831                 }
832
833 #ifdef DEBUG
834                 fprintf(stderr,
835                         "PDP UPD ds[%lu]\t"
836                         "pdp_temp %10.2f\t"
837                         "new_prep %10.2f\t"
838                         "new_unkn_sec %5lu\n",
839                         i, pdp_temp[i],
840                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
841                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
842 #endif
843             }
844
845             /* if there were errors during the last loop, bail out here */
846             if (rrd_test_error()) {
847                 free(step_start);
848                 break;
849             }
850
851             /* compute the number of elapsed pdp_st moments */
852             elapsed_pdp_st =
853                 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
854 #ifdef DEBUG
855             fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
856 #endif
857             if (rra_step_cnt == NULL) {
858                 rra_step_cnt = (unsigned long *)
859                     malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
860             }
861
862             for (i = 0, rra_start = rra_begin;
863                  i < rrd.stat_head->rra_cnt;
864                  rra_start +=
865                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
866                  sizeof(rrd_value_t), i++) {
867                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
868                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
869                     (proc_pdp_st / rrd.stat_head->pdp_step) %
870                     rrd.rra_def[i].pdp_cnt;
871                 if (start_pdp_offset <= elapsed_pdp_st) {
872                     rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
873                         rrd.rra_def[i].pdp_cnt + 1;
874                 } else {
875                     rra_step_cnt[i] = 0;
876                 }
877
878                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
879                     /* If this is a bulk update, we need to skip ahead in
880                        the seasonal arrays so that they will be correct for
881                        the next observed value;
882                        note that for the bulk update itself, no update will
883                        occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
884                        and DEVPREDICT will be set to DNAN. */
885                     if (rra_step_cnt[i] > 2) {
886                         /* skip update by resetting rra_step_cnt[i],
887                            note that this is not data source specific; this is
888                            due to the bulk update, not a DNAN value for the
889                            specific data source. */
890                         rra_step_cnt[i] = 0;
891                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
892                                         elapsed_pdp_st, &last_seasonal_coef);
893                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
894                                         elapsed_pdp_st + 1, &seasonal_coef);
895                     }
896
897                     /* periodically run a smoother for seasonal effects */
898                     /* Need to use first cdp parameter buffer to track
899                      * burnin (burnin requires a specific smoothing schedule).
900                      * The CDP_init_seasonal parameter is really an RRA level,
901                      * not a data source within RRA level parameter, but the rra_def
902                      * is read only for rrd_update (not flushed to disk). */
903                     iii = i * (rrd.stat_head->ds_cnt);
904                     if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
905                         <= BURNIN_CYCLES) {
906                         if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
907                             > rrd.rra_def[i].row_cnt - 1) {
908                             /* mark off one of the burnin cycles */
909                             ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
910                                u_cnt);
911                             schedule_smooth = 1;
912                         }
913                     } else {
914                         /* someone has no doubt invented a trick to deal with this
915                          * wrap around, but at least this code is clear. */
916                         if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
917                             u_cnt > rrd.rra_ptr[i].cur_row) {
918                             /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
919                              * mapping between PDP and CDP */
920                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
921                                 >=
922                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
923                                 u_cnt) {
924 #ifdef DEBUG
925                                 fprintf(stderr,
926                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
927                                         rrd.rra_ptr[i].cur_row,
928                                         elapsed_pdp_st,
929                                         rrd.rra_def[i].
930                                         par[RRA_seasonal_smooth_idx].u_cnt);
931 #endif
932                                 schedule_smooth = 1;
933                             }
934                         } else {
935                             /* can't rely on negative numbers because we are working with
936                              * unsigned values */
937                             /* Don't need modulus here. If we've wrapped more than once, only
938                              * one smooth is executed at the end. */
939                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
940                                 rrd.rra_def[i].row_cnt
941                                 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
942                                 rrd.rra_def[i].row_cnt >=
943                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
944                                 u_cnt) {
945 #ifdef DEBUG
946                                 fprintf(stderr,
947                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
948                                         rrd.rra_ptr[i].cur_row,
949                                         elapsed_pdp_st,
950                                         rrd.rra_def[i].
951                                         par[RRA_seasonal_smooth_idx].u_cnt);
952 #endif
953                                 schedule_smooth = 1;
954                             }
955                         }
956                     }
957
958                     rra_current = rrd_tell(rrd_file);
959                 }
960                 /* if cf is DEVSEASONAL or SEASONAL */
961                 if (rrd_test_error())
962                     break;
963
964                 /* update CDP_PREP areas */
965                 /* loop over data soures within each RRA */
966                 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
967
968                     /* iii indexes the CDP prep area for this data source within the RRA */
969                     iii = i * rrd.stat_head->ds_cnt + ii;
970
971                     if (rrd.rra_def[i].pdp_cnt > 1) {
972
973                         if (rra_step_cnt[i] > 0) {
974                             /* If we are in this block, as least 1 CDP value will be written to
975                              * disk, this is the CDP_primary_val entry. If more than 1 value needs
976                              * to be written, then the "fill in" value is the CDP_secondary_val
977                              * entry. */
978                             if (isnan(pdp_temp[ii])) {
979                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
980                                     u_cnt += start_pdp_offset;
981                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
982                                     u_val = DNAN;
983                             } else {
984                                 /* CDP_secondary value is the RRA "fill in" value for intermediary
985                                  * CDP data entries. No matter the CF, the value is the same because
986                                  * the average, max, min, and last of a list of identical values is
987                                  * the same, namely, the value itself. */
988                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
989                                     u_val = pdp_temp[ii];
990                             }
991
992                             if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
993                                 u_cnt >
994                                 rrd.rra_def[i].pdp_cnt *
995                                 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
996                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
997                                     u_val = DNAN;
998                                 /* initialize carry over */
999                                 if (current_cf == CF_AVERAGE) {
1000                                     if (isnan(pdp_temp[ii])) {
1001                                         rrd.cdp_prep[iii].scratch[CDP_val].
1002                                             u_val = DNAN;
1003                                     } else {
1004                                         rrd.cdp_prep[iii].scratch[CDP_val].
1005                                             u_val =
1006                                             pdp_temp[ii] *
1007                                             ((elapsed_pdp_st -
1008                                               start_pdp_offset) %
1009                                              rrd.rra_def[i].pdp_cnt);
1010                                     }
1011                                 } else {
1012                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1013                                         pdp_temp[ii];
1014                                 }
1015                             } else {
1016                                 rrd_value_t cum_val, cur_val;
1017
1018                                 switch (current_cf) {
1019                                 case CF_AVERAGE:
1020                                     cum_val =
1021                                         IFDNAN(rrd.cdp_prep[iii].
1022                                                scratch[CDP_val].u_val, 0.0);
1023                                     cur_val = IFDNAN(pdp_temp[ii], 0.0);
1024                                     rrd.cdp_prep[iii].
1025                                         scratch[CDP_primary_val].u_val =
1026                                         (cum_val +
1027                                          cur_val * start_pdp_offset) /
1028                                         (rrd.rra_def[i].pdp_cnt -
1029                                          rrd.cdp_prep[iii].
1030                                          scratch[CDP_unkn_pdp_cnt].u_cnt);
1031                                     /* initialize carry over value */
1032                                     if (isnan(pdp_temp[ii])) {
1033                                         rrd.cdp_prep[iii].scratch[CDP_val].
1034                                             u_val = DNAN;
1035                                     } else {
1036                                         rrd.cdp_prep[iii].scratch[CDP_val].
1037                                             u_val =
1038                                             pdp_temp[ii] *
1039                                             ((elapsed_pdp_st -
1040                                               start_pdp_offset) %
1041                                              rrd.rra_def[i].pdp_cnt);
1042                                     }
1043                                     break;
1044                                 case CF_MAXIMUM:
1045                                     cum_val =
1046                                         IFDNAN(rrd.cdp_prep[iii].
1047                                                scratch[CDP_val].u_val, -DINF);
1048                                     cur_val = IFDNAN(pdp_temp[ii], -DINF);
1049 #ifdef DEBUG
1050                                     if (isnan
1051                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1052                                          u_val) && isnan(pdp_temp[ii])) {
1053                                         fprintf(stderr,
1054                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1055                                                 i, ii);
1056                                         exit(-1);
1057                                     }
1058 #endif
1059                                     if (cur_val > cum_val)
1060                                         rrd.cdp_prep[iii].
1061                                             scratch[CDP_primary_val].u_val =
1062                                             cur_val;
1063                                     else
1064                                         rrd.cdp_prep[iii].
1065                                             scratch[CDP_primary_val].u_val =
1066                                             cum_val;
1067                                     /* initialize carry over value */
1068                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1069                                         pdp_temp[ii];
1070                                     break;
1071                                 case CF_MINIMUM:
1072                                     cum_val =
1073                                         IFDNAN(rrd.cdp_prep[iii].
1074                                                scratch[CDP_val].u_val, DINF);
1075                                     cur_val = IFDNAN(pdp_temp[ii], DINF);
1076 #ifdef DEBUG
1077                                     if (isnan
1078                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1079                                          u_val) && isnan(pdp_temp[ii])) {
1080                                         fprintf(stderr,
1081                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1082                                                 i, ii);
1083                                         exit(-1);
1084                                     }
1085 #endif
1086                                     if (cur_val < cum_val)
1087                                         rrd.cdp_prep[iii].
1088                                             scratch[CDP_primary_val].u_val =
1089                                             cur_val;
1090                                     else
1091                                         rrd.cdp_prep[iii].
1092                                             scratch[CDP_primary_val].u_val =
1093                                             cum_val;
1094                                     /* initialize carry over value */
1095                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1096                                         pdp_temp[ii];
1097                                     break;
1098                                 case CF_LAST:
1099                                 default:
1100                                     rrd.cdp_prep[iii].
1101                                         scratch[CDP_primary_val].u_val =
1102                                         pdp_temp[ii];
1103                                     /* initialize carry over value */
1104                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1105                                         pdp_temp[ii];
1106                                     break;
1107                                 }
1108                             }   /* endif meets xff value requirement for a valid value */
1109                             /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1110                              * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1111                             if (isnan(pdp_temp[ii]))
1112                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1113                                     u_cnt =
1114                                     (elapsed_pdp_st -
1115                                      start_pdp_offset) %
1116                                     rrd.rra_def[i].pdp_cnt;
1117                             else
1118                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1119                                     u_cnt = 0;
1120                         } else {    /* rra_step_cnt[i]  == 0 */
1121
1122 #ifdef DEBUG
1123                             if (isnan
1124                                 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1125                                 fprintf(stderr,
1126                                         "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1127                                         i, ii);
1128                             } else {
1129                                 fprintf(stderr,
1130                                         "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1131                                         i, ii,
1132                                         rrd.cdp_prep[iii].scratch[CDP_val].
1133                                         u_val);
1134                             }
1135 #endif
1136                             if (isnan(pdp_temp[ii])) {
1137                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1138                                     u_cnt += elapsed_pdp_st;
1139                             } else
1140                                 if (isnan
1141                                     (rrd.cdp_prep[iii].scratch[CDP_val].
1142                                      u_val)) {
1143                                 if (current_cf == CF_AVERAGE) {
1144                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1145                                         pdp_temp[ii] * elapsed_pdp_st;
1146                                 } else {
1147                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1148                                         pdp_temp[ii];
1149                                 }
1150 #ifdef DEBUG
1151                                 fprintf(stderr,
1152                                         "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1153                                         i, ii,
1154                                         rrd.cdp_prep[iii].scratch[CDP_val].
1155                                         u_val);
1156 #endif
1157                             } else {
1158                                 switch (current_cf) {
1159                                 case CF_AVERAGE:
1160                                     rrd.cdp_prep[iii].scratch[CDP_val].
1161                                         u_val +=
1162                                         pdp_temp[ii] * elapsed_pdp_st;
1163                                     break;
1164                                 case CF_MINIMUM:
1165                                     if (pdp_temp[ii] <
1166                                         rrd.cdp_prep[iii].scratch[CDP_val].
1167                                         u_val)
1168                                         rrd.cdp_prep[iii].scratch[CDP_val].
1169                                             u_val = pdp_temp[ii];
1170                                     break;
1171                                 case CF_MAXIMUM:
1172                                     if (pdp_temp[ii] >
1173                                         rrd.cdp_prep[iii].scratch[CDP_val].
1174                                         u_val)
1175                                         rrd.cdp_prep[iii].scratch[CDP_val].
1176                                             u_val = pdp_temp[ii];
1177                                     break;
1178                                 case CF_LAST:
1179                                 default:
1180                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1181                                         pdp_temp[ii];
1182                                     break;
1183                                 }
1184                             }
1185                         }
1186                     } else {    /* rrd.rra_def[i].pdp_cnt == 1 */
1187                         if (elapsed_pdp_st > 2) {
1188                             switch (current_cf) {
1189                             case CF_AVERAGE:
1190                             default:
1191                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1192                                     u_val = pdp_temp[ii];
1193                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1194                                     u_val = pdp_temp[ii];
1195                                 break;
1196                             case CF_SEASONAL:
1197                             case CF_DEVSEASONAL:
1198                                 /* need to update cached seasonal values, so they are consistent
1199                                  * with the bulk update */
1200                                 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1201                                  * CDP_last_deviation are the same. */
1202                                 rrd.cdp_prep[iii].
1203                                     scratch[CDP_hw_last_seasonal].u_val =
1204                                     last_seasonal_coef[ii];
1205                                 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1206                                     u_val = seasonal_coef[ii];
1207                                 break;
1208                             case CF_HWPREDICT:
1209                             case CF_MHWPREDICT:
1210                                 /* need to update the null_count and last_null_count.
1211                                  * even do this for non-DNAN pdp_temp because the
1212                                  * algorithm is not learning from batch updates. */
1213                                 rrd.cdp_prep[iii].scratch[CDP_null_count].
1214                                     u_cnt += elapsed_pdp_st;
1215                                 rrd.cdp_prep[iii].
1216                                     scratch[CDP_last_null_count].u_cnt +=
1217                                     elapsed_pdp_st - 1;
1218                                 /* fall through */
1219                             case CF_DEVPREDICT:
1220                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1221                                     u_val = DNAN;
1222                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1223                                     u_val = DNAN;
1224                                 break;
1225                             case CF_FAILURES:
1226                                 /* do not count missed bulk values as failures */
1227                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1228                                     u_val = 0;
1229                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1230                                     u_val = 0;
1231                                 /* need to reset violations buffer.
1232                                  * could do this more carefully, but for now, just
1233                                  * assume a bulk update wipes away all violations. */
1234                                 erase_violations(&rrd, iii, i);
1235                                 break;
1236                             }
1237                         }
1238                     }   /* endif rrd.rra_def[i].pdp_cnt == 1 */
1239
1240                     if (rrd_test_error())
1241                         break;
1242
1243                 }       /* endif data sources loop */
1244             }           /* end RRA Loop */
1245
1246             /* this loop is only entered if elapsed_pdp_st < 3 */
1247             for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1248                  j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1249                 for (i = 0, rra_start = rra_begin;
1250                      i < rrd.stat_head->rra_cnt;
1251                      rra_start +=
1252                      rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1253                      sizeof(rrd_value_t), i++) {
1254                     if (rrd.rra_def[i].pdp_cnt > 1)
1255                         continue;
1256
1257                     current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1258                     if (current_cf == CF_SEASONAL
1259                         || current_cf == CF_DEVSEASONAL) {
1260                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
1261                                         elapsed_pdp_st + (scratch_idx ==
1262                                                           CDP_primary_val ? 1
1263                                                           : 2),
1264                                         &seasonal_coef);
1265                         rra_current = rrd_tell(rrd_file);
1266                     }
1267                     if (rrd_test_error())
1268                         break;
1269                     /* loop over data soures within each RRA */
1270                     for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1271                         update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1272                                            i * (rrd.stat_head->ds_cnt) + ii,
1273                                            i, ii, scratch_idx, seasonal_coef);
1274                     }
1275                 }       /* end RRA Loop */
1276                 if (rrd_test_error())
1277                     break;
1278             }           /* end elapsed_pdp_st loop */
1279
1280             if (rrd_test_error())
1281                 break;
1282
1283             /* Ready to write to disk */
1284             /* Move sequentially through the file, writing one RRA at a time.
1285              * Note this architecture divorces the computation of CDP with
1286              * flushing updated RRA entries to disk. */
1287             for (i = 0, rra_start = rra_begin;
1288                  i < rrd.stat_head->rra_cnt;
1289                  rra_start +=
1290                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1291                  sizeof(rrd_value_t), i++) {
1292                 /* is th5Aere anything to write for this RRA? If not, continue. */
1293                 if (rra_step_cnt[i] == 0)
1294                     continue;
1295
1296                 /* write the first row */
1297 #ifdef DEBUG
1298                 fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1299 #endif
1300                 rrd.rra_ptr[i].cur_row++;
1301                 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1302                     rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1303                 /* positition on the first row */
1304                 rra_pos_tmp = rra_start +
1305                     (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1306                     sizeof(rrd_value_t);
1307                 if (rra_pos_tmp != rra_current) {
1308                     if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1309                         rrd_set_error("seek error in rrd");
1310                         break;
1311                     }
1312                     rra_current = rra_pos_tmp;
1313                 }
1314 #ifdef DEBUG
1315                 fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1316 #endif
1317                 scratch_idx = CDP_primary_val;
1318                 if (pcdp_summary != NULL) {
1319                     rra_time = (current_time - current_time
1320                                 % (rrd.rra_def[i].pdp_cnt *
1321                                    rrd.stat_head->pdp_step))
1322                         -
1323                         ((rra_step_cnt[i] -
1324                           1) * rrd.rra_def[i].pdp_cnt *
1325                          rrd.stat_head->pdp_step);
1326                 }
1327                 pcdp_summary =
1328                     write_RRA_row(rrd_file, &rrd, i, &rra_current,
1329                                   scratch_idx, pcdp_summary, &rra_time);
1330                 if (rrd_test_error())
1331                     break;
1332
1333                 /* write other rows of the bulk update, if any */
1334                 scratch_idx = CDP_secondary_val;
1335                 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1336                     if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1337 #ifdef DEBUG
1338                         fprintf(stderr,
1339                                 "Wraparound for RRA %s, %lu updates left\n",
1340                                 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1341 #endif
1342                         /* wrap */
1343                         rrd.rra_ptr[i].cur_row = 0;
1344                         /* seek back to beginning of current rra */
1345                         if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1346                             rrd_set_error("seek error in rrd");
1347                             break;
1348                         }
1349 #ifdef DEBUG
1350                         fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1351                                 rrd_file->pos);
1352 #endif
1353                         rra_current = rra_start;
1354                     }
1355                     if (pcdp_summary != NULL) {
1356                         rra_time = (current_time - current_time
1357                                     % (rrd.rra_def[i].pdp_cnt *
1358                                        rrd.stat_head->pdp_step))
1359                             -
1360                             ((rra_step_cnt[i] -
1361                               2) * rrd.rra_def[i].pdp_cnt *
1362                              rrd.stat_head->pdp_step);
1363                     }
1364                     pcdp_summary =
1365                         write_RRA_row(rrd_file, &rrd, i, &rra_current,
1366                                       scratch_idx, pcdp_summary, &rra_time);
1367                 }
1368
1369                 if (rrd_test_error())
1370                     break;
1371             }           /* RRA LOOP */
1372
1373             /* break out of the argument parsing loop if error_string is set */
1374             if (rrd_test_error()) {
1375                 free(step_start);
1376                 break;
1377             }
1378
1379         }               /* endif a pdp_st has occurred */
1380         rrd.live_head->last_up = current_time;
1381         rrd.live_head->last_up_usec = current_time_usec;
1382         free(step_start);
1383     }                   /* function argument loop */
1384
1385     if (seasonal_coef != NULL)
1386         free(seasonal_coef);
1387     if (last_seasonal_coef != NULL)
1388         free(last_seasonal_coef);
1389     if (rra_step_cnt != NULL)
1390         free(rra_step_cnt);
1391     rpnstack_free(&rpnstack);
1392
1393 #if 0
1394     //rrd_flush(rrd_file);    //XXX: really needed?
1395 #endif
1396     /* if we got here and if there is an error and if the file has not been
1397      * written to, then close things up and return. */
1398     if (rrd_test_error()) {
1399         goto err_free_pdp_new;
1400     }
1401
1402     /* aargh ... that was tough ... so many loops ... anyway, its done.
1403      * we just need to write back the live header portion now*/
1404
1405     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1406                             + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1407                             + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1408                  SEEK_SET) != 0) {
1409         rrd_set_error("seek rrd for live header writeback");
1410         goto err_free_pdp_new;
1411     }
1412     /* for mmap, we did already write to the underlying mapping, so we do
1413        not need to write again.  */
1414 #ifndef HAVE_MMAP
1415     if (version >= 3) {
1416         if (rrd_write(rrd_file, rrd.live_head,
1417                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1418             rrd_set_error("rrd_write live_head to rrd");
1419             goto err_free_pdp_new;
1420         }
1421     } else {
1422         if (rrd_write(rrd_file, &rrd.live_head->last_up,
1423                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1424             rrd_set_error("rrd_write live_head to rrd");
1425             goto err_free_pdp_new;
1426         }
1427     }
1428
1429
1430     if (rrd_write(rrd_file, rrd.pdp_prep,
1431                   sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1432         != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1433         rrd_set_error("rrd_write pdp_prep to rrd");
1434         goto err_free_pdp_new;
1435     }
1436
1437     if (rrd_write(rrd_file, rrd.cdp_prep,
1438                   sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1439                   rrd.stat_head->ds_cnt)
1440         != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1441                       rrd.stat_head->ds_cnt)) {
1442
1443         rrd_set_error("rrd_write cdp_prep to rrd");
1444         goto err_free_pdp_new;
1445     }
1446
1447     if (rrd_write(rrd_file, rrd.rra_ptr,
1448                   sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1449         != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1450         rrd_set_error("rrd_write rra_ptr to rrd");
1451         goto err_free_pdp_new;
1452     }
1453 #endif
1454
1455     /* rrd_flush(rrd_file); */
1456
1457     /* calling the smoothing code here guarantees at most
1458      * one smoothing operation per rrd_update call. Unfortunately,
1459      * it is possible with bulk updates, or a long-delayed update
1460      * for smoothing to occur off-schedule. This really isn't
1461      * critical except during the burning cycles. */
1462     if (schedule_smooth) {
1463
1464         rra_start = rra_begin;
1465         for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1466             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1467                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1468 #ifdef DEBUG
1469                 fprintf(stderr, "Running smoother for rra %ld\n", i);
1470 #endif
1471                 apply_smoother(&rrd, i, rra_start, rrd_file);
1472                 if (rrd_test_error())
1473                     break;
1474             }
1475             rra_start += rrd.rra_def[i].row_cnt
1476                 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1477         }
1478     }
1479
1480 /*    rrd_dontneed(rrd_file,&rrd); */
1481     rrd_free(&rrd);
1482     rrd_close(rrd_file);
1483
1484     free(pdp_new);
1485     free(tmpl_idx);
1486     free(pdp_temp);
1487     free(updvals);
1488     return (0);
1489
1490   err_free_pdp_new:
1491     free(pdp_new);
1492   err_free_tmpl_idx:
1493     free(tmpl_idx);
1494   err_free_pdp_temp:
1495     free(pdp_temp);
1496   err_free_updvals:
1497     free(updvals);
1498   err_close:
1499     rrd_close(rrd_file);
1500   err_free:
1501     rrd_free(&rrd);
1502   err_out:
1503     return (-1);
1504 }
1505
1506 /*
1507  * get exclusive lock to whole file.
1508  * lock gets removed when we close the file
1509  *
1510  * returns 0 on success
1511  */
1512 int LockRRD(
1513     int in_file)
1514 {
1515     int       rcstat;
1516
1517     {
1518 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1519         struct _stat st;
1520
1521         if (_fstat(in_file, &st) == 0) {
1522             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1523         } else {
1524             rcstat = -1;
1525         }
1526 #else
1527         struct flock lock;
1528
1529         lock.l_type = F_WRLCK;  /* exclusive write lock */
1530         lock.l_len = 0; /* whole file */
1531         lock.l_start = 0;   /* start of file */
1532         lock.l_whence = SEEK_SET;   /* end of file */
1533
1534         rcstat = fcntl(in_file, F_SETLK, &lock);
1535 #endif
1536     }
1537
1538     return (rcstat);
1539 }