don't force data out ... let cache management do this
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
16
17 #include <locale.h>
18
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
21
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
24
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
28  * replacement.
29  */
30 #include <sys/timeb.h>
31
32 #ifndef __MINGW32__
33 struct timeval {
34     time_t    tv_sec;   /* seconds */
35     long      tv_usec;  /* microseconds */
36 };
37 #endif
38
39 struct __timezone {
40     int       tz_minuteswest;   /* minutes W of Greenwich */
41     int       tz_dsttime;   /* type of dst correction */
42 };
43
44 static int gettimeofday(
45     struct timeval *t,
46     struct __timezone *tz)
47 {
48
49     struct _timeb current_time;
50
51     _ftime(&current_time);
52
53     t->tv_sec = current_time.time;
54     t->tv_usec = current_time.millitm * 1000;
55
56     return 0;
57 }
58
59 #endif
60 /*
61  * normalize time as returned by gettimeofday. usec part must
62  * be always >= 0
63  */
64 static inline void normalize_time(
65     struct timeval *t)
66 {
67     if (t->tv_usec < 0) {
68         t->tv_sec--;
69         t->tv_usec += 1000000L;
70     }
71 }
72
73 static inline info_t *write_RRA_row(
74     rrd_file_t *rrd_file,
75     rrd_t *rrd,
76     unsigned long rra_idx,
77     unsigned long *rra_current,
78     unsigned short CDP_scratch_idx,
79     info_t *pcdp_summary,
80     time_t *rra_time)
81 {
82     unsigned long ds_idx, cdp_idx;
83     infoval   iv;
84
85     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
86         /* compute the cdp index */
87         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
88 #ifdef DEBUG
89         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
90                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
91                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
92 #endif
93         if (pcdp_summary != NULL) {
94             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
95             /* append info to the return hash */
96             pcdp_summary = info_push(pcdp_summary,
97                                      sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
98                                                    *rra_time,
99                                                    rrd->rra_def[rra_idx].
100                                                    cf_nam,
101                                                    rrd->rra_def[rra_idx].
102                                                    pdp_cnt,
103                                                    rrd->ds_def[ds_idx].
104                                                    ds_nam), RD_I_VAL, iv);
105         }
106         if (rrd_write
107             (rrd_file,
108              &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
109              sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
110             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
111             return 0;
112         }
113         *rra_current += sizeof(rrd_value_t);
114     }
115     return (pcdp_summary);
116 }
117
118 int       rrd_update_r(
119     const char *filename,
120     const char *tmplt,
121     int argc,
122     const char **argv);
123 int       _rrd_update(
124     const char *filename,
125     const char *tmplt,
126     int argc,
127     const char **argv,
128     info_t *);
129
130 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
131
132
133 info_t   *rrd_update_v(
134     int argc,
135     char **argv)
136 {
137     char     *tmplt = NULL;
138     info_t   *result = NULL;
139     infoval   rc;
140     struct option long_options[] = {
141         {"template", required_argument, 0, 't'},
142         {0, 0, 0, 0}
143     };
144
145     rc.u_int = -1;
146     optind = 0;
147     opterr = 0;         /* initialize getopt */
148
149     while (1) {
150         int       option_index = 0;
151         int       opt;
152
153         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
154
155         if (opt == EOF)
156             break;
157
158         switch (opt) {
159         case 't':
160             tmplt = optarg;
161             break;
162
163         case '?':
164             rrd_set_error("unknown option '%s'", argv[optind - 1]);
165             goto end_tag;
166         }
167     }
168
169     /* need at least 2 arguments: filename, data. */
170     if (argc - optind < 2) {
171         rrd_set_error("Not enough arguments");
172         goto end_tag;
173     }
174     rc.u_int = 0;
175     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
176     rc.u_int = _rrd_update(argv[optind], tmplt,
177                            argc - optind - 1,
178                            (const char **) (argv + optind + 1), result);
179     result->value.u_int = rc.u_int;
180   end_tag:
181     return result;
182 }
183
184 int rrd_update(
185     int argc,
186     char **argv)
187 {
188     struct option long_options[] = {
189         {"template", required_argument, 0, 't'},
190         {0, 0, 0, 0}
191     };
192     int       option_index = 0;
193     int       opt;
194     char     *tmplt = NULL;
195     int       rc;
196
197     optind = 0;
198     opterr = 0;         /* initialize getopt */
199
200     while (1) {
201         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
202
203         if (opt == EOF)
204             break;
205
206         switch (opt) {
207         case 't':
208             tmplt = strdup(optarg);
209             break;
210
211         case '?':
212             rrd_set_error("unknown option '%s'", argv[optind - 1]);
213             return (-1);
214         }
215     }
216
217     /* need at least 2 arguments: filename, data. */
218     if (argc - optind < 2) {
219         rrd_set_error("Not enough arguments");
220
221         return -1;
222     }
223
224     rc = rrd_update_r(argv[optind], tmplt,
225                       argc - optind - 1, (const char **) (argv + optind + 1));
226     free(tmplt);
227     return rc;
228 }
229
230 int rrd_update_r(
231     const char *filename,
232     const char *tmplt,
233     int argc,
234     const char **argv)
235 {
236     return _rrd_update(filename, tmplt, argc, argv, NULL);
237 }
238
239 int _rrd_update(
240     const char *filename,
241     const char *tmplt,
242     int argc,
243     const char **argv,
244     info_t *pcdp_summary)
245 {
246
247     int       arg_i = 2;
248     short     j;
249     unsigned long i, ii, iii = 1;
250
251     unsigned long rra_begin;    /* byte pointer to the rra
252                                  * area in the rrd file.  this
253                                  * pointer never changes value */
254     unsigned long rra_start;    /* byte pointer to the rra
255                                  * area in the rrd file.  this
256                                  * pointer changes as each rrd is
257                                  * processed. */
258     unsigned long rra_current;  /* byte pointer to the current write
259                                  * spot in the rrd file. */
260     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
261     double    interval, pre_int, post_int;  /* interval between this and
262                                              * the last run */
263     unsigned long proc_pdp_st;  /* which pdp_st was the last
264                                  * to be processed */
265     unsigned long occu_pdp_st;  /* when was the pdp_st
266                                  * before the last update
267                                  * time */
268     unsigned long proc_pdp_age; /* how old was the data in
269                                  * the pdp prep area when it
270                                  * was last updated */
271     unsigned long occu_pdp_age; /* how long ago was the last
272                                  * pdp_step time */
273     rrd_value_t *pdp_new;   /* prepare the incoming data
274                              * to be added the the
275                              * existing entry */
276     rrd_value_t *pdp_temp;  /* prepare the pdp values
277                              * to be added the the
278                              * cdp values */
279
280     long     *tmpl_idx; /* index representing the settings
281                            transported by the tmplt index */
282     unsigned long tmpl_cnt = 2; /* time and data */
283
284     rrd_t     rrd;
285     time_t    current_time = 0;
286     time_t    rra_time = 0; /* time of update for a RRA */
287     unsigned long current_time_usec = 0;    /* microseconds part of current time */
288     struct timeval tmp_time;    /* used for time conversion */
289
290     char    **updvals;
291     int       schedule_smooth = 0;
292     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
293
294     /* a vector of future Holt-Winters seasonal coefs */
295     unsigned long elapsed_pdp_st;
296
297     /* number of elapsed PDP steps since last update */
298     unsigned long *rra_step_cnt = NULL;
299
300     /* number of rows to be updated in an RRA for a data
301      * value. */
302     unsigned long start_pdp_offset;
303
304     /* number of PDP steps since the last update that
305      * are assigned to the first CDP to be generated
306      * since the last update. */
307     unsigned short scratch_idx;
308
309     /* index into the CDP scratch array */
310     enum cf_en current_cf;
311
312     /* numeric id of the current consolidation function */
313     rpnstack_t rpnstack;    /* used for COMPUTE DS */
314     int       version;  /* rrd version */
315     char     *endptr;   /* used in the conversion */
316     rrd_file_t *rrd_file;
317
318     rpnstack_init(&rpnstack);
319
320     /* need at least 1 arguments: data. */
321     if (argc < 1) {
322         rrd_set_error("Not enough arguments");
323         goto err_out;
324     }
325
326     rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
327     if (rrd_file == NULL) {
328         goto err_free;
329     }
330     /* We are now at the beginning of the rra's */
331     rra_current = rra_start = rra_begin = rrd_file->header_len;
332
333     /* initialize time */
334     version = atoi(rrd.stat_head->version);
335     gettimeofday(&tmp_time, 0);
336     normalize_time(&tmp_time);
337     current_time = tmp_time.tv_sec;
338     if (version >= 3) {
339         current_time_usec = tmp_time.tv_usec;
340     } else {
341         current_time_usec = 0;
342     }
343
344     /* get exclusive lock to whole file.
345      * lock gets removed when we close the file.
346      */
347     if (LockRRD(rrd_file->fd) != 0) {
348         rrd_set_error("could not lock RRD");
349         goto err_close;
350     }
351
352     if ((updvals =
353          malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
354         rrd_set_error("allocating updvals pointer array");
355         goto err_close;
356     }
357
358     if ((pdp_temp = malloc(sizeof(rrd_value_t)
359                            * rrd.stat_head->ds_cnt)) == NULL) {
360         rrd_set_error("allocating pdp_temp ...");
361         goto err_free_updvals;
362     }
363
364     if ((tmpl_idx = malloc(sizeof(unsigned long)
365                            * (rrd.stat_head->ds_cnt + 1))) == NULL) {
366         rrd_set_error("allocating tmpl_idx ...");
367         goto err_free_pdp_temp;
368     }
369     /* initialize tmplt redirector */
370     /* default config example (assume DS 1 is a CDEF DS)
371        tmpl_idx[0] -> 0; (time)
372        tmpl_idx[1] -> 1; (DS 0)
373        tmpl_idx[2] -> 3; (DS 2)
374        tmpl_idx[3] -> 4; (DS 3) */
375     tmpl_idx[0] = 0;    /* time */
376     for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
377         if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
378             tmpl_idx[ii++] = i;
379     }
380     tmpl_cnt = ii;
381
382     if (tmplt) {
383         /* we should work on a writeable copy here */
384         char     *dsname;
385         unsigned int tmpl_len;
386         char     *tmplt_copy = strdup(tmplt);
387
388         dsname = tmplt_copy;
389         tmpl_cnt = 1;   /* the first entry is the time */
390         tmpl_len = strlen(tmplt_copy);
391         for (i = 0; i <= tmpl_len; i++) {
392             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
393                 tmplt_copy[i] = '\0';
394                 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
395                     rrd_set_error
396                         ("tmplt contains more DS definitions than RRD");
397                     goto err_free_tmpl_idx;
398                 }
399                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
400                     rrd_set_error("unknown DS name '%s'", dsname);
401                     goto err_free_tmpl_idx;
402                 } else {
403                     /* the first element is always the time */
404                     tmpl_idx[tmpl_cnt - 1]++;
405                     /* go to the next entry on the tmplt_copy */
406                     dsname = &tmplt_copy[i + 1];
407                     /* fix the damage we did before */
408                     if (i < tmpl_len) {
409                         tmplt_copy[i] = ':';
410                     }
411
412                 }
413             }
414         }
415         free(tmplt_copy);
416     }
417     if ((pdp_new = malloc(sizeof(rrd_value_t)
418                           * rrd.stat_head->ds_cnt)) == NULL) {
419         rrd_set_error("allocating pdp_new ...");
420         goto err_free_tmpl_idx;
421     }
422     /* loop through the arguments. */
423     for (arg_i = 0; arg_i < argc; arg_i++) {
424         char     *stepper = strdup(argv[arg_i]);
425         char     *step_start = stepper;
426         char     *p;
427         char     *parsetime_error = NULL;
428         enum { atstyle, normal } timesyntax;
429         struct rrd_time_value ds_tv;
430
431         if (stepper == NULL) {
432             rrd_set_error("failed duplication argv entry");
433             free(step_start);
434             goto err_free_pdp_new;
435         }
436         /* initialize all ds input to unknown except the first one
437            which has always got to be set */
438         for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
439             updvals[ii] = "U";
440         updvals[0] = stepper;
441         /* separate all ds elements; first must be examined separately
442            due to alternate time syntax */
443         if ((p = strchr(stepper, '@')) != NULL) {
444             timesyntax = atstyle;
445             *p = '\0';
446             stepper = p + 1;
447         } else if ((p = strchr(stepper, ':')) != NULL) {
448             timesyntax = normal;
449             *p = '\0';
450             stepper = p + 1;
451         } else {
452             rrd_set_error
453                 ("expected timestamp not found in data source from %s",
454                  argv[arg_i]);
455             free(step_start);
456             break;
457         }
458         ii = 1;
459         updvals[tmpl_idx[ii]] = stepper;
460         while (*stepper) {
461             if (*stepper == ':') {
462                 *stepper = '\0';
463                 ii++;
464                 if (ii < tmpl_cnt) {
465                     updvals[tmpl_idx[ii]] = stepper + 1;
466                 }
467             }
468             stepper++;
469         }
470
471         if (ii != tmpl_cnt - 1) {
472             rrd_set_error
473                 ("expected %lu data source readings (got %lu) from %s",
474                  tmpl_cnt - 1, ii, argv[arg_i]);
475             free(step_start);
476             break;
477         }
478
479         /* get the time from the reading ... handle N */
480         if (timesyntax == atstyle) {
481             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
482                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
483                 free(step_start);
484                 break;
485             }
486             if (ds_tv.type == RELATIVE_TO_END_TIME ||
487                 ds_tv.type == RELATIVE_TO_START_TIME) {
488                 rrd_set_error("specifying time relative to the 'start' "
489                               "or 'end' makes no sense here: %s", updvals[0]);
490                 free(step_start);
491                 break;
492             }
493
494             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
495
496             current_time_usec = 0;  /* FIXME: how to handle usecs here ? */
497
498         } else if (strcmp(updvals[0], "N") == 0) {
499             gettimeofday(&tmp_time, 0);
500             normalize_time(&tmp_time);
501             current_time = tmp_time.tv_sec;
502             current_time_usec = tmp_time.tv_usec;
503         } else {
504             double    tmp;
505             char    *old_locale;
506             old_locale = setlocale(LC_NUMERIC,"C");
507             tmp = strtod(updvals[0], 0);
508             setlocale(LC_NUMERIC,old_locale);
509             current_time = floor(tmp);
510             current_time_usec =
511                 (long) ((tmp - (double) current_time) * 1000000.0);
512         }
513         /* dont do any correction for old version RRDs */
514         if (version < 3)
515             current_time_usec = 0;
516
517         if (current_time < rrd.live_head->last_up ||
518             (current_time == rrd.live_head->last_up &&
519              (long) current_time_usec <=
520              (long) rrd.live_head->last_up_usec)) {
521             rrd_set_error("illegal attempt to update using time %ld when "
522                           "last update time is %ld (minimum one second step)",
523                           current_time, rrd.live_head->last_up);
524             free(step_start);
525             break;
526         }
527
528         /* seek to the beginning of the rra's */
529         if (rra_current != rra_begin) {
530 #ifndef HAVE_MMAP
531             if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
532                 rrd_set_error("seek error in rrd");
533                 free(step_start);
534                 break;
535             }
536 #endif
537             rra_current = rra_begin;
538         }
539         rra_start = rra_begin;
540
541         /* when was the current pdp started */
542         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
543         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
544
545         /* when did the last pdp_st occur */
546         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
547         occu_pdp_st = current_time - occu_pdp_age;
548
549         /* interval = current_time - rrd.live_head->last_up; */
550         interval = (double) (current_time - rrd.live_head->last_up)
551             + (double) ((long) current_time_usec -
552                         (long) rrd.live_head->last_up_usec) / 1000000.0;
553
554         if (occu_pdp_st > proc_pdp_st) {
555             /* OK we passed the pdp_st moment */
556             pre_int = (long) occu_pdp_st - rrd.live_head->last_up;  /* how much of the input data
557                                                                      * occurred before the latest
558                                                                      * pdp_st moment*/
559             pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0;  /* adjust usecs */
560             post_int = occu_pdp_age;    /* how much after it */
561             post_int += ((double) current_time_usec) / 1000000.0;   /* adjust usecs */
562         } else {
563             pre_int = interval;
564             post_int = 0;
565         }
566
567 #ifdef DEBUG
568         printf("proc_pdp_age %lu\t"
569                "proc_pdp_st %lu\t"
570                "occu_pfp_age %lu\t"
571                "occu_pdp_st %lu\t"
572                "int %lf\t"
573                "pre_int %lf\t"
574                "post_int %lf\n", proc_pdp_age, proc_pdp_st,
575                occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
576 #endif
577
578         /* process the data sources and update the pdp_prep 
579          * area accordingly */
580         for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
581             enum dst_en dst_idx;
582
583             dst_idx = dst_conv(rrd.ds_def[i].dst);
584
585             /* make sure we do not build diffs with old last_ds values */
586             if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
587                 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
588                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
589             }
590
591             /* NOTE: DST_CDEF should never enter this if block, because
592              * updvals[i+1][0] is initialized to 'U'; unless the caller
593              * accidently specified a value for the DST_CDEF. To handle
594              * this case, an extra check is required. */
595
596             if ((updvals[i + 1][0] != 'U') &&
597                 (dst_idx != DST_CDEF) &&
598                 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
599                 double    rate = DNAN;
600                 char    *old_locale;
601
602                 /* the data source type defines how to process the data */
603                 /* pdp_new contains rate * time ... eg the bytes
604                  * transferred during the interval. Doing it this way saves
605                  * a lot of math operations */
606                 switch (dst_idx) {
607                 case DST_COUNTER:
608                 case DST_DERIVE:
609                     if (rrd.pdp_prep[i].last_ds[0] != 'U') {
610                         for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
611                             if ((updvals[i + 1][ii] < '0'
612                                  || updvals[i + 1][ii] > '9') && (ii != 0
613                                                                   && updvals[i
614                                                                              +
615                                                                              1]
616                                                                   [ii] !=
617                                                                   '-')) {
618                                 rrd_set_error("not a simple integer: '%s'",
619                                               updvals[i + 1]);
620                                 break;
621                             }
622                         }
623                         if (rrd_test_error()) {
624                             break;
625                         }
626                         pdp_new[i] =
627                             rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
628                         if (dst_idx == DST_COUNTER) {
629                             /* simple overflow catcher suggested by Andres Kroonmaa */
630                             /* this will fail terribly for non 32 or 64 bit counters ... */
631                             /* are there any others in SNMP land ? */
632                             if (pdp_new[i] < (double) 0.0)
633                                 pdp_new[i] += (double) 4294967296.0;    /* 2^32 */
634                             if (pdp_new[i] < (double) 0.0)
635                                 pdp_new[i] += (double) 18446744069414584320.0;
636                             /* 2^64-2^32 */ ;
637                         }
638                         rate = pdp_new[i] / interval;
639                     } else {
640                         pdp_new[i] = DNAN;
641                     }
642                     break;
643                 case DST_ABSOLUTE:
644                     old_locale = setlocale(LC_NUMERIC,"C");
645                     errno = 0;
646                     pdp_new[i] = strtod(updvals[i + 1], &endptr);
647                     setlocale(LC_NUMERIC,old_locale);
648                     if (errno > 0) {
649                         rrd_set_error("converting '%s' to float: %s",
650                                       updvals[i + 1], rrd_strerror(errno));
651                         break;
652                     };
653                     if (endptr[0] != '\0') {
654                         rrd_set_error
655                             ("conversion of '%s' to float not complete: tail '%s'",
656                              updvals[i + 1], endptr);
657                         break;
658                     }
659                     rate = pdp_new[i] / interval;
660                     break;
661                 case DST_GAUGE:
662                     errno = 0;
663                     old_locale = setlocale(LC_NUMERIC,"C");
664                     pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
665                     setlocale(LC_NUMERIC,old_locale);
666                     if (errno > 0) {
667                         rrd_set_error("converting '%s' to float: %s",
668                                       updvals[i + 1], rrd_strerror(errno));
669                         break;
670                     };
671                     if (endptr[0] != '\0') {
672                         rrd_set_error
673                             ("conversion of '%s' to float not complete: tail '%s'",
674                              updvals[i + 1], endptr);
675                         break;
676                     }
677                     rate = pdp_new[i] / interval;
678                     break;
679                 default:
680                     rrd_set_error("rrd contains unknown DS type : '%s'",
681                                   rrd.ds_def[i].dst);
682                     break;
683                 }
684                 /* break out of this for loop if the error string is set */
685                 if (rrd_test_error()) {
686                     break;
687                 }
688                 /* make sure pdp_temp is neither too large or too small
689                  * if any of these occur it becomes unknown ...
690                  * sorry folks ... */
691                 if (!isnan(rate) &&
692                     ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
693                       rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
694                      (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
695                       rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
696                     pdp_new[i] = DNAN;
697                 }
698             } else {
699                 /* no news is news all the same */
700                 pdp_new[i] = DNAN;
701             }
702
703
704             /* make a copy of the command line argument for the next run */
705 #ifdef DEBUG
706             fprintf(stderr,
707                     "prep ds[%lu]\t"
708                     "last_arg '%s'\t"
709                     "this_arg '%s'\t"
710                     "pdp_new %10.2f\n",
711                     i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
712 #endif
713             strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
714             rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
715         }
716         /* break out of the argument parsing loop if the error_string is set */
717         if (rrd_test_error()) {
718             free(step_start);
719             break;
720         }
721         /* has a pdp_st moment occurred since the last run ? */
722
723         if (proc_pdp_st == occu_pdp_st) {
724             /* no we have not passed a pdp_st moment. therefore update is simple */
725
726             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
727                 if (isnan(pdp_new[i])) {
728                     /* this is not realy accurate if we use subsecond data arival time
729                        should have thought of it when going subsecond resolution ...
730                        sorry next format change we will have it! */
731                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
732                         floor(interval);
733                 } else {
734                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
735                         rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
736                     } else {
737                         rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
738                     }
739                 }
740 #ifdef DEBUG
741                 fprintf(stderr,
742                         "NO PDP  ds[%lu]\t"
743                         "value %10.2f\t"
744                         "unkn_sec %5lu\n",
745                         i,
746                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
747                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
748 #endif
749             }
750         } else {
751             /* an pdp_st has occurred. */
752
753             /* in pdp_prep[].scratch[PDP_val].u_val we have collected
754                rate*seconds which occurred up to the last run.
755                pdp_new[] contains rate*seconds from the latest run.
756                pdp_temp[] will contain the rate for cdp */
757
758             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
759                 /* update pdp_prep to the current pdp_st. */
760                 double    pre_unknown = 0.0;
761
762                 if (isnan(pdp_new[i])) {
763                     /* a final bit of unkonwn to be added bevore calculation
764                        we use a temporary variable for this so that we
765                        don't have to turn integer lines before using the value */
766                     pre_unknown = pre_int;
767                 } else {
768                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
769                         rrd.pdp_prep[i].scratch[PDP_val].u_val =
770                             pdp_new[i] / interval * pre_int;
771                     } else {
772                         rrd.pdp_prep[i].scratch[PDP_val].u_val +=
773                             pdp_new[i] / interval * pre_int;
774                     }
775                 }
776
777
778                 /* if too much of the pdp_prep is unknown we dump it */
779                 if (
780                        /* removed because this does not agree with the
781                           definition that a heartbeat can be unknown */
782                        /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
783                           > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
784                        /* if the interval is larger thatn mrhb we get NAN */
785                        (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
786                        (occu_pdp_st - proc_pdp_st <=
787                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
788                     pdp_temp[i] = DNAN;
789                 } else {
790                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
791                         / ((double) (occu_pdp_st - proc_pdp_st
792                                      -
793                                      rrd.pdp_prep[i].
794                                      scratch[PDP_unkn_sec_cnt].u_cnt)
795                            - pre_unknown);
796                 }
797
798                 /* process CDEF data sources; remember each CDEF DS can
799                  * only reference other DS with a lower index number */
800                 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
801                     rpnp_t   *rpnp;
802
803                     rpnp =
804                         rpn_expand((rpn_cdefds_t *) &
805                                    (rrd.ds_def[i].par[DS_cdef]));
806                     /* substitue data values for OP_VARIABLE nodes */
807                     for (ii = 0; rpnp[ii].op != OP_END; ii++) {
808                         if (rpnp[ii].op == OP_VARIABLE) {
809                             rpnp[ii].op = OP_NUMBER;
810                             rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
811                         }
812                     }
813                     /* run the rpn calculator */
814                     if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
815                         free(rpnp);
816                         break;  /* exits the data sources pdp_temp loop */
817                     }
818                 }
819
820                 /* make pdp_prep ready for the next run */
821                 if (isnan(pdp_new[i])) {
822                     /* this is not realy accurate if we use subsecond data arival time
823                        should have thought of it when going subsecond resolution ...
824                        sorry next format change we will have it! */
825                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
826                         floor(post_int);
827                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
828                 } else {
829                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
830                     rrd.pdp_prep[i].scratch[PDP_val].u_val =
831                         pdp_new[i] / interval * post_int;
832                 }
833
834 #ifdef DEBUG
835                 fprintf(stderr,
836                         "PDP UPD ds[%lu]\t"
837                         "pdp_temp %10.2f\t"
838                         "new_prep %10.2f\t"
839                         "new_unkn_sec %5lu\n",
840                         i, pdp_temp[i],
841                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
842                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
843 #endif
844             }
845
846             /* if there were errors during the last loop, bail out here */
847             if (rrd_test_error()) {
848                 free(step_start);
849                 break;
850             }
851
852             /* compute the number of elapsed pdp_st moments */
853             elapsed_pdp_st =
854                 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
855 #ifdef DEBUG
856             fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
857 #endif
858             if (rra_step_cnt == NULL) {
859                 rra_step_cnt = (unsigned long *)
860                     malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
861             }
862
863             for (i = 0, rra_start = rra_begin;
864                  i < rrd.stat_head->rra_cnt;
865                  rra_start +=
866                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
867                  sizeof(rrd_value_t), i++) {
868                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
869                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
870                     (proc_pdp_st / rrd.stat_head->pdp_step) %
871                     rrd.rra_def[i].pdp_cnt;
872                 if (start_pdp_offset <= elapsed_pdp_st) {
873                     rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
874                         rrd.rra_def[i].pdp_cnt + 1;
875                 } else {
876                     rra_step_cnt[i] = 0;
877                 }
878
879                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
880                     /* If this is a bulk update, we need to skip ahead in
881                        the seasonal arrays so that they will be correct for
882                        the next observed value;
883                        note that for the bulk update itself, no update will
884                        occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
885                        and DEVPREDICT will be set to DNAN. */
886                     if (rra_step_cnt[i] > 2) {
887                         /* skip update by resetting rra_step_cnt[i],
888                            note that this is not data source specific; this is
889                            due to the bulk update, not a DNAN value for the
890                            specific data source. */
891                         rra_step_cnt[i] = 0;
892                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
893                                         elapsed_pdp_st, &last_seasonal_coef);
894                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
895                                         elapsed_pdp_st + 1, &seasonal_coef);
896                     }
897
898                     /* periodically run a smoother for seasonal effects */
899                     /* Need to use first cdp parameter buffer to track
900                      * burnin (burnin requires a specific smoothing schedule).
901                      * The CDP_init_seasonal parameter is really an RRA level,
902                      * not a data source within RRA level parameter, but the rra_def
903                      * is read only for rrd_update (not flushed to disk). */
904                     iii = i * (rrd.stat_head->ds_cnt);
905                     if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
906                         <= BURNIN_CYCLES) {
907                         if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
908                             > rrd.rra_def[i].row_cnt - 1) {
909                             /* mark off one of the burnin cycles */
910                             ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
911                                u_cnt);
912                             schedule_smooth = 1;
913                         }
914                     } else {
915                         /* someone has no doubt invented a trick to deal with this
916                          * wrap around, but at least this code is clear. */
917                         if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
918                             u_cnt > rrd.rra_ptr[i].cur_row) {
919                             /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
920                              * mapping between PDP and CDP */
921                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
922                                 >=
923                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
924                                 u_cnt) {
925 #ifdef DEBUG
926                                 fprintf(stderr,
927                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
928                                         rrd.rra_ptr[i].cur_row,
929                                         elapsed_pdp_st,
930                                         rrd.rra_def[i].
931                                         par[RRA_seasonal_smooth_idx].u_cnt);
932 #endif
933                                 schedule_smooth = 1;
934                             }
935                         } else {
936                             /* can't rely on negative numbers because we are working with
937                              * unsigned values */
938                             /* Don't need modulus here. If we've wrapped more than once, only
939                              * one smooth is executed at the end. */
940                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
941                                 rrd.rra_def[i].row_cnt
942                                 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
943                                 rrd.rra_def[i].row_cnt >=
944                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
945                                 u_cnt) {
946 #ifdef DEBUG
947                                 fprintf(stderr,
948                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
949                                         rrd.rra_ptr[i].cur_row,
950                                         elapsed_pdp_st,
951                                         rrd.rra_def[i].
952                                         par[RRA_seasonal_smooth_idx].u_cnt);
953 #endif
954                                 schedule_smooth = 1;
955                             }
956                         }
957                     }
958
959                     rra_current = rrd_tell(rrd_file);
960                 }
961                 /* if cf is DEVSEASONAL or SEASONAL */
962                 if (rrd_test_error())
963                     break;
964
965                 /* update CDP_PREP areas */
966                 /* loop over data soures within each RRA */
967                 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
968
969                     /* iii indexes the CDP prep area for this data source within the RRA */
970                     iii = i * rrd.stat_head->ds_cnt + ii;
971
972                     if (rrd.rra_def[i].pdp_cnt > 1) {
973
974                         if (rra_step_cnt[i] > 0) {
975                             /* If we are in this block, as least 1 CDP value will be written to
976                              * disk, this is the CDP_primary_val entry. If more than 1 value needs
977                              * to be written, then the "fill in" value is the CDP_secondary_val
978                              * entry. */
979                             if (isnan(pdp_temp[ii])) {
980                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
981                                     u_cnt += start_pdp_offset;
982                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
983                                     u_val = DNAN;
984                             } else {
985                                 /* CDP_secondary value is the RRA "fill in" value for intermediary
986                                  * CDP data entries. No matter the CF, the value is the same because
987                                  * the average, max, min, and last of a list of identical values is
988                                  * the same, namely, the value itself. */
989                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
990                                     u_val = pdp_temp[ii];
991                             }
992
993                             if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
994                                 u_cnt >
995                                 rrd.rra_def[i].pdp_cnt *
996                                 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
997                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
998                                     u_val = DNAN;
999                                 /* initialize carry over */
1000                                 if (current_cf == CF_AVERAGE) {
1001                                     if (isnan(pdp_temp[ii])) {
1002                                         rrd.cdp_prep[iii].scratch[CDP_val].
1003                                             u_val = DNAN;
1004                                     } else {
1005                                         rrd.cdp_prep[iii].scratch[CDP_val].
1006                                             u_val =
1007                                             pdp_temp[ii] *
1008                                             ((elapsed_pdp_st -
1009                                               start_pdp_offset) %
1010                                              rrd.rra_def[i].pdp_cnt);
1011                                     }
1012                                 } else {
1013                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1014                                         pdp_temp[ii];
1015                                 }
1016                             } else {
1017                                 rrd_value_t cum_val, cur_val;
1018
1019                                 switch (current_cf) {
1020                                 case CF_AVERAGE:
1021                                     cum_val =
1022                                         IFDNAN(rrd.cdp_prep[iii].
1023                                                scratch[CDP_val].u_val, 0.0);
1024                                     cur_val = IFDNAN(pdp_temp[ii], 0.0);
1025                                     rrd.cdp_prep[iii].
1026                                         scratch[CDP_primary_val].u_val =
1027                                         (cum_val +
1028                                          cur_val * start_pdp_offset) /
1029                                         (rrd.rra_def[i].pdp_cnt -
1030                                          rrd.cdp_prep[iii].
1031                                          scratch[CDP_unkn_pdp_cnt].u_cnt);
1032                                     /* initialize carry over value */
1033                                     if (isnan(pdp_temp[ii])) {
1034                                         rrd.cdp_prep[iii].scratch[CDP_val].
1035                                             u_val = DNAN;
1036                                     } else {
1037                                         rrd.cdp_prep[iii].scratch[CDP_val].
1038                                             u_val =
1039                                             pdp_temp[ii] *
1040                                             ((elapsed_pdp_st -
1041                                               start_pdp_offset) %
1042                                              rrd.rra_def[i].pdp_cnt);
1043                                     }
1044                                     break;
1045                                 case CF_MAXIMUM:
1046                                     cum_val =
1047                                         IFDNAN(rrd.cdp_prep[iii].
1048                                                scratch[CDP_val].u_val, -DINF);
1049                                     cur_val = IFDNAN(pdp_temp[ii], -DINF);
1050 #ifdef DEBUG
1051                                     if (isnan
1052                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1053                                          u_val) && isnan(pdp_temp[ii])) {
1054                                         fprintf(stderr,
1055                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1056                                                 i, ii);
1057                                         exit(-1);
1058                                     }
1059 #endif
1060                                     if (cur_val > cum_val)
1061                                         rrd.cdp_prep[iii].
1062                                             scratch[CDP_primary_val].u_val =
1063                                             cur_val;
1064                                     else
1065                                         rrd.cdp_prep[iii].
1066                                             scratch[CDP_primary_val].u_val =
1067                                             cum_val;
1068                                     /* initialize carry over value */
1069                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1070                                         pdp_temp[ii];
1071                                     break;
1072                                 case CF_MINIMUM:
1073                                     cum_val =
1074                                         IFDNAN(rrd.cdp_prep[iii].
1075                                                scratch[CDP_val].u_val, DINF);
1076                                     cur_val = IFDNAN(pdp_temp[ii], DINF);
1077 #ifdef DEBUG
1078                                     if (isnan
1079                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1080                                          u_val) && isnan(pdp_temp[ii])) {
1081                                         fprintf(stderr,
1082                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1083                                                 i, ii);
1084                                         exit(-1);
1085                                     }
1086 #endif
1087                                     if (cur_val < cum_val)
1088                                         rrd.cdp_prep[iii].
1089                                             scratch[CDP_primary_val].u_val =
1090                                             cur_val;
1091                                     else
1092                                         rrd.cdp_prep[iii].
1093                                             scratch[CDP_primary_val].u_val =
1094                                             cum_val;
1095                                     /* initialize carry over value */
1096                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1097                                         pdp_temp[ii];
1098                                     break;
1099                                 case CF_LAST:
1100                                 default:
1101                                     rrd.cdp_prep[iii].
1102                                         scratch[CDP_primary_val].u_val =
1103                                         pdp_temp[ii];
1104                                     /* initialize carry over value */
1105                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1106                                         pdp_temp[ii];
1107                                     break;
1108                                 }
1109                             }   /* endif meets xff value requirement for a valid value */
1110                             /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1111                              * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1112                             if (isnan(pdp_temp[ii]))
1113                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1114                                     u_cnt =
1115                                     (elapsed_pdp_st -
1116                                      start_pdp_offset) %
1117                                     rrd.rra_def[i].pdp_cnt;
1118                             else
1119                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1120                                     u_cnt = 0;
1121                         } else {    /* rra_step_cnt[i]  == 0 */
1122
1123 #ifdef DEBUG
1124                             if (isnan
1125                                 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1126                                 fprintf(stderr,
1127                                         "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1128                                         i, ii);
1129                             } else {
1130                                 fprintf(stderr,
1131                                         "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1132                                         i, ii,
1133                                         rrd.cdp_prep[iii].scratch[CDP_val].
1134                                         u_val);
1135                             }
1136 #endif
1137                             if (isnan(pdp_temp[ii])) {
1138                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1139                                     u_cnt += elapsed_pdp_st;
1140                             } else
1141                                 if (isnan
1142                                     (rrd.cdp_prep[iii].scratch[CDP_val].
1143                                      u_val)) {
1144                                 if (current_cf == CF_AVERAGE) {
1145                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1146                                         pdp_temp[ii] * elapsed_pdp_st;
1147                                 } else {
1148                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1149                                         pdp_temp[ii];
1150                                 }
1151 #ifdef DEBUG
1152                                 fprintf(stderr,
1153                                         "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1154                                         i, ii,
1155                                         rrd.cdp_prep[iii].scratch[CDP_val].
1156                                         u_val);
1157 #endif
1158                             } else {
1159                                 switch (current_cf) {
1160                                 case CF_AVERAGE:
1161                                     rrd.cdp_prep[iii].scratch[CDP_val].
1162                                         u_val +=
1163                                         pdp_temp[ii] * elapsed_pdp_st;
1164                                     break;
1165                                 case CF_MINIMUM:
1166                                     if (pdp_temp[ii] <
1167                                         rrd.cdp_prep[iii].scratch[CDP_val].
1168                                         u_val)
1169                                         rrd.cdp_prep[iii].scratch[CDP_val].
1170                                             u_val = pdp_temp[ii];
1171                                     break;
1172                                 case CF_MAXIMUM:
1173                                     if (pdp_temp[ii] >
1174                                         rrd.cdp_prep[iii].scratch[CDP_val].
1175                                         u_val)
1176                                         rrd.cdp_prep[iii].scratch[CDP_val].
1177                                             u_val = pdp_temp[ii];
1178                                     break;
1179                                 case CF_LAST:
1180                                 default:
1181                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1182                                         pdp_temp[ii];
1183                                     break;
1184                                 }
1185                             }
1186                         }
1187                     } else {    /* rrd.rra_def[i].pdp_cnt == 1 */
1188                         if (elapsed_pdp_st > 2) {
1189                             switch (current_cf) {
1190                             case CF_AVERAGE:
1191                             default:
1192                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1193                                     u_val = pdp_temp[ii];
1194                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1195                                     u_val = pdp_temp[ii];
1196                                 break;
1197                             case CF_SEASONAL:
1198                             case CF_DEVSEASONAL:
1199                                 /* need to update cached seasonal values, so they are consistent
1200                                  * with the bulk update */
1201                                 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1202                                  * CDP_last_deviation are the same. */
1203                                 rrd.cdp_prep[iii].
1204                                     scratch[CDP_hw_last_seasonal].u_val =
1205                                     last_seasonal_coef[ii];
1206                                 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1207                                     u_val = seasonal_coef[ii];
1208                                 break;
1209                             case CF_HWPREDICT:
1210                             case CF_MHWPREDICT:
1211                                 /* need to update the null_count and last_null_count.
1212                                  * even do this for non-DNAN pdp_temp because the
1213                                  * algorithm is not learning from batch updates. */
1214                                 rrd.cdp_prep[iii].scratch[CDP_null_count].
1215                                     u_cnt += elapsed_pdp_st;
1216                                 rrd.cdp_prep[iii].
1217                                     scratch[CDP_last_null_count].u_cnt +=
1218                                     elapsed_pdp_st - 1;
1219                                 /* fall through */
1220                             case CF_DEVPREDICT:
1221                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1222                                     u_val = DNAN;
1223                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1224                                     u_val = DNAN;
1225                                 break;
1226                             case CF_FAILURES:
1227                                 /* do not count missed bulk values as failures */
1228                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1229                                     u_val = 0;
1230                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1231                                     u_val = 0;
1232                                 /* need to reset violations buffer.
1233                                  * could do this more carefully, but for now, just
1234                                  * assume a bulk update wipes away all violations. */
1235                                 erase_violations(&rrd, iii, i);
1236                                 break;
1237                             }
1238                         }
1239                     }   /* endif rrd.rra_def[i].pdp_cnt == 1 */
1240
1241                     if (rrd_test_error())
1242                         break;
1243
1244                 }       /* endif data sources loop */
1245             }           /* end RRA Loop */
1246
1247             /* this loop is only entered if elapsed_pdp_st < 3 */
1248             for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1249                  j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1250                 for (i = 0, rra_start = rra_begin;
1251                      i < rrd.stat_head->rra_cnt;
1252                      rra_start +=
1253                      rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1254                      sizeof(rrd_value_t), i++) {
1255                     if (rrd.rra_def[i].pdp_cnt > 1)
1256                         continue;
1257
1258                     current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1259                     if (current_cf == CF_SEASONAL
1260                         || current_cf == CF_DEVSEASONAL) {
1261                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
1262                                         elapsed_pdp_st + (scratch_idx ==
1263                                                           CDP_primary_val ? 1
1264                                                           : 2),
1265                                         &seasonal_coef);
1266                         rra_current = rrd_tell(rrd_file);
1267                     }
1268                     if (rrd_test_error())
1269                         break;
1270                     /* loop over data soures within each RRA */
1271                     for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1272                         update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1273                                            i * (rrd.stat_head->ds_cnt) + ii,
1274                                            i, ii, scratch_idx, seasonal_coef);
1275                     }
1276                 }       /* end RRA Loop */
1277                 if (rrd_test_error())
1278                     break;
1279             }           /* end elapsed_pdp_st loop */
1280
1281             if (rrd_test_error())
1282                 break;
1283
1284             /* Ready to write to disk */
1285             /* Move sequentially through the file, writing one RRA at a time.
1286              * Note this architecture divorces the computation of CDP with
1287              * flushing updated RRA entries to disk. */
1288             for (i = 0, rra_start = rra_begin;
1289                  i < rrd.stat_head->rra_cnt;
1290                  rra_start +=
1291                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1292                  sizeof(rrd_value_t), i++) {
1293                 /* is th5Aere anything to write for this RRA? If not, continue. */
1294                 if (rra_step_cnt[i] == 0)
1295                     continue;
1296
1297                 /* write the first row */
1298 #ifdef DEBUG
1299                 fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1300 #endif
1301                 rrd.rra_ptr[i].cur_row++;
1302                 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1303                     rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1304                 /* positition on the first row */
1305                 rra_pos_tmp = rra_start +
1306                     (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1307                     sizeof(rrd_value_t);
1308                 if (rra_pos_tmp != rra_current) {
1309                     if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1310                         rrd_set_error("seek error in rrd");
1311                         break;
1312                     }
1313                     rra_current = rra_pos_tmp;
1314                 }
1315 #ifdef DEBUG
1316                 fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1317 #endif
1318                 scratch_idx = CDP_primary_val;
1319                 if (pcdp_summary != NULL) {
1320                     rra_time = (current_time - current_time
1321                                 % (rrd.rra_def[i].pdp_cnt *
1322                                    rrd.stat_head->pdp_step))
1323                         -
1324                         ((rra_step_cnt[i] -
1325                           1) * rrd.rra_def[i].pdp_cnt *
1326                          rrd.stat_head->pdp_step);
1327                 }
1328                 pcdp_summary =
1329                     write_RRA_row(rrd_file, &rrd, i, &rra_current,
1330                                   scratch_idx, pcdp_summary, &rra_time);
1331                 if (rrd_test_error())
1332                     break;
1333
1334                 /* write other rows of the bulk update, if any */
1335                 scratch_idx = CDP_secondary_val;
1336                 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1337                     if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1338 #ifdef DEBUG
1339                         fprintf(stderr,
1340                                 "Wraparound for RRA %s, %lu updates left\n",
1341                                 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1342 #endif
1343                         /* wrap */
1344                         rrd.rra_ptr[i].cur_row = 0;
1345                         /* seek back to beginning of current rra */
1346                         if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1347                             rrd_set_error("seek error in rrd");
1348                             break;
1349                         }
1350 #ifdef DEBUG
1351                         fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1352                                 rrd_file->pos);
1353 #endif
1354                         rra_current = rra_start;
1355                     }
1356                     if (pcdp_summary != NULL) {
1357                         rra_time = (current_time - current_time
1358                                     % (rrd.rra_def[i].pdp_cnt *
1359                                        rrd.stat_head->pdp_step))
1360                             -
1361                             ((rra_step_cnt[i] -
1362                               2) * rrd.rra_def[i].pdp_cnt *
1363                              rrd.stat_head->pdp_step);
1364                     }
1365                     pcdp_summary =
1366                         write_RRA_row(rrd_file, &rrd, i, &rra_current,
1367                                       scratch_idx, pcdp_summary, &rra_time);
1368                 }
1369
1370                 if (rrd_test_error())
1371                     break;
1372             }           /* RRA LOOP */
1373
1374             /* break out of the argument parsing loop if error_string is set */
1375             if (rrd_test_error()) {
1376                 free(step_start);
1377                 break;
1378             }
1379
1380         }               /* endif a pdp_st has occurred */
1381         rrd.live_head->last_up = current_time;
1382         rrd.live_head->last_up_usec = current_time_usec;
1383         free(step_start);
1384     }                   /* function argument loop */
1385
1386     if (seasonal_coef != NULL)
1387         free(seasonal_coef);
1388     if (last_seasonal_coef != NULL)
1389         free(last_seasonal_coef);
1390     if (rra_step_cnt != NULL)
1391         free(rra_step_cnt);
1392     rpnstack_free(&rpnstack);
1393
1394 #if 0
1395     //rrd_flush(rrd_file);    //XXX: really needed?
1396 #endif
1397     /* if we got here and if there is an error and if the file has not been
1398      * written to, then close things up and return. */
1399     if (rrd_test_error()) {
1400         goto err_free_pdp_new;
1401     }
1402
1403     /* aargh ... that was tough ... so many loops ... anyway, its done.
1404      * we just need to write back the live header portion now*/
1405
1406     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1407                             + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1408                             + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1409                  SEEK_SET) != 0) {
1410         rrd_set_error("seek rrd for live header writeback");
1411         goto err_free_pdp_new;
1412     }
1413     /* for mmap, we did already write to the underlying mapping, so we do
1414        not need to write again.  */
1415 #ifndef HAVE_MMAP
1416     if (version >= 3) {
1417         if (rrd_write(rrd_file, rrd.live_head,
1418                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1419             rrd_set_error("rrd_write live_head to rrd");
1420             goto err_free_pdp_new;
1421         }
1422     } else {
1423         if (rrd_write(rrd_file, &rrd.live_head->last_up,
1424                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1425             rrd_set_error("rrd_write live_head to rrd");
1426             goto err_free_pdp_new;
1427         }
1428     }
1429
1430
1431     if (rrd_write(rrd_file, rrd.pdp_prep,
1432                   sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1433         != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1434         rrd_set_error("rrd_write pdp_prep to rrd");
1435         goto err_free_pdp_new;
1436     }
1437
1438     if (rrd_write(rrd_file, rrd.cdp_prep,
1439                   sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1440                   rrd.stat_head->ds_cnt)
1441         != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1442                       rrd.stat_head->ds_cnt)) {
1443
1444         rrd_set_error("rrd_write cdp_prep to rrd");
1445         goto err_free_pdp_new;
1446     }
1447
1448     if (rrd_write(rrd_file, rrd.rra_ptr,
1449                   sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1450         != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1451         rrd_set_error("rrd_write rra_ptr to rrd");
1452         goto err_free_pdp_new;
1453     }
1454 #endif
1455
1456     /* rrd_flush(rrd_file); */
1457
1458     /* calling the smoothing code here guarantees at most
1459      * one smoothing operation per rrd_update call. Unfortunately,
1460      * it is possible with bulk updates, or a long-delayed update
1461      * for smoothing to occur off-schedule. This really isn't
1462      * critical except during the burning cycles. */
1463     if (schedule_smooth) {
1464
1465         rra_start = rra_begin;
1466         for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1467             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1468                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1469 #ifdef DEBUG
1470                 fprintf(stderr, "Running smoother for rra %ld\n", i);
1471 #endif
1472                 apply_smoother(&rrd, i, rra_start, rrd_file);
1473                 if (rrd_test_error())
1474                     break;
1475             }
1476             rra_start += rrd.rra_def[i].row_cnt
1477                 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1478         }
1479     }
1480
1481 /*    rrd_dontneed(rrd_file,&rrd); */
1482     rrd_free(&rrd);
1483     rrd_close(rrd_file);
1484
1485     free(pdp_new);
1486     free(tmpl_idx);
1487     free(pdp_temp);
1488     free(updvals);
1489     return (0);
1490
1491   err_free_pdp_new:
1492     free(pdp_new);
1493   err_free_tmpl_idx:
1494     free(tmpl_idx);
1495   err_free_pdp_temp:
1496     free(pdp_temp);
1497   err_free_updvals:
1498     free(updvals);
1499   err_close:
1500     rrd_close(rrd_file);
1501   err_free:
1502     rrd_free(&rrd);
1503   err_out:
1504     return (-1);
1505 }
1506
1507 /*
1508  * get exclusive lock to whole file.
1509  * lock gets removed when we close the file
1510  *
1511  * returns 0 on success
1512  */
1513 int LockRRD(
1514     int in_file)
1515 {
1516     int       rcstat;
1517
1518     {
1519 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1520         struct _stat st;
1521
1522         if (_fstat(in_file, &st) == 0) {
1523             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1524         } else {
1525             rcstat = -1;
1526         }
1527 #else
1528         struct flock lock;
1529
1530         lock.l_type = F_WRLCK;  /* exclusive write lock */
1531         lock.l_len = 0; /* whole file */
1532         lock.l_start = 0;   /* start of file */
1533         lock.l_whence = SEEK_SET;   /* end of file */
1534
1535         rcstat = fcntl(in_file, F_SETLK, &lock);
1536 #endif
1537     }
1538
1539     return (rcstat);
1540 }