Bernhard Fischer:
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
16
17 #include "rrd_hw.h"
18 #include "rrd_rpncalc.h"
19
20 #include "rrd_is_thread_safe.h"
21 #include "unused.h"
22
23 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
24 /*
25  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
26  * replacement.
27  */
28 #include <sys/timeb.h>
29
30 #ifndef __MINGW32__
31 struct timeval {
32     time_t    tv_sec;   /* seconds */
33     long      tv_usec;  /* microseconds */
34 };
35 #endif
36
37 struct __timezone {
38     int       tz_minuteswest;   /* minutes W of Greenwich */
39     int       tz_dsttime;   /* type of dst correction */
40 };
41
42 static int gettimeofday(
43     struct timeval *t,
44     struct __timezone *tz)
45 {
46
47     struct _timeb current_time;
48
49     _ftime(&current_time);
50
51     t->tv_sec = current_time.time;
52     t->tv_usec = current_time.millitm * 1000;
53
54     return 0;
55 }
56
57 #endif
58 /*
59  * normalize time as returned by gettimeofday. usec part must
60  * be always >= 0
61  */
62 static inline void normalize_time(
63     struct timeval *t)
64 {
65     if (t->tv_usec < 0) {
66         t->tv_sec--;
67         t->tv_usec += 1000000L;
68     }
69 }
70
71 static inline info_t *write_RRA_row(
72     rrd_file_t *rrd_file,
73     rrd_t *rrd,
74     unsigned long rra_idx,
75     unsigned long *rra_current,
76     unsigned short CDP_scratch_idx,
77     info_t *pcdp_summary,
78     time_t *rra_time)
79 {
80     unsigned long ds_idx, cdp_idx;
81     infoval   iv;
82
83     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
84         /* compute the cdp index */
85         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
86 #ifdef DEBUG
87         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
88                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
89                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
90 #endif
91         if (pcdp_summary != NULL) {
92             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
93             /* append info to the return hash */
94             pcdp_summary = info_push(pcdp_summary,
95                                      sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
96                                                    *rra_time,
97                                                    rrd->rra_def[rra_idx].
98                                                    cf_nam,
99                                                    rrd->rra_def[rra_idx].
100                                                    pdp_cnt,
101                                                    rrd->ds_def[ds_idx].
102                                                    ds_nam), RD_I_VAL, iv);
103         }
104         if (rrd_write
105             (rrd_file,
106              &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
107              sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
108             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
109             return 0;
110         }
111         *rra_current += sizeof(rrd_value_t);
112     }
113     return (pcdp_summary);
114 }
115
116 int       rrd_update_r(
117     const char *filename,
118     const char *tmplt,
119     int argc,
120     const char **argv);
121 int       _rrd_update(
122     const char *filename,
123     const char *tmplt,
124     int argc,
125     const char **argv,
126     info_t *);
127
128 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
129
130
131 info_t   *rrd_update_v(
132     int argc,
133     char **argv)
134 {
135     char     *tmplt = NULL;
136     info_t   *result = NULL;
137     infoval   rc;
138
139     rc.u_int = -1;
140     optind = 0;
141     opterr = 0;         /* initialize getopt */
142
143     while (1) {
144         static struct option long_options[] = {
145             {"template", required_argument, 0, 't'},
146             {0, 0, 0, 0}
147         };
148         int       option_index = 0;
149         int       opt;
150
151         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
152
153         if (opt == EOF)
154             break;
155
156         switch (opt) {
157         case 't':
158             tmplt = optarg;
159             break;
160
161         case '?':
162             rrd_set_error("unknown option '%s'", argv[optind - 1]);
163             goto end_tag;
164         }
165     }
166
167     /* need at least 2 arguments: filename, data. */
168     if (argc - optind < 2) {
169         rrd_set_error("Not enough arguments");
170         goto end_tag;
171     }
172     rc.u_int = 0;
173     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
174     rc.u_int = _rrd_update(argv[optind], tmplt,
175                            argc - optind - 1,
176                            (const char **) (argv + optind + 1), result);
177     result->value.u_int = rc.u_int;
178   end_tag:
179     return result;
180 }
181
182 int rrd_update(
183     int argc,
184     char **argv)
185 {
186     struct option long_options[] = {
187         {"template", required_argument, 0, 't'},
188         {0, 0, 0, 0}
189     };
190     int       option_index = 0;
191     int       opt;
192     char     *tmplt = NULL;
193     int       rc;
194
195     optind = 0;
196     opterr = 0;         /* initialize getopt */
197
198     while (1) {
199         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
200
201         if (opt == EOF)
202             break;
203
204         switch (opt) {
205         case 't':
206             tmplt = strdup(optarg);
207             break;
208
209         case '?':
210             rrd_set_error("unknown option '%s'", argv[optind - 1]);
211             return (-1);
212         }
213     }
214
215     /* need at least 2 arguments: filename, data. */
216     if (argc - optind < 2) {
217         rrd_set_error("Not enough arguments");
218
219         return -1;
220     }
221
222     rc = rrd_update_r(argv[optind], tmplt,
223                       argc - optind - 1, (const char **) (argv + optind + 1));
224     free(tmplt);
225     return rc;
226 }
227
228 int rrd_update_r(
229     const char *filename,
230     const char *tmplt,
231     int argc,
232     const char **argv)
233 {
234     return _rrd_update(filename, tmplt, argc, argv, NULL);
235 }
236
237 int _rrd_update(
238     const char *filename,
239     const char *tmplt,
240     int argc,
241     const char **argv,
242     info_t *pcdp_summary)
243 {
244
245     int       arg_i = 2;
246     short     j;
247     unsigned long i, ii, iii = 1;
248
249     unsigned long rra_begin;    /* byte pointer to the rra
250                                  * area in the rrd file.  this
251                                  * pointer never changes value */
252     unsigned long rra_start;    /* byte pointer to the rra
253                                  * area in the rrd file.  this
254                                  * pointer changes as each rrd is
255                                  * processed. */
256     unsigned long rra_current;  /* byte pointer to the current write
257                                  * spot in the rrd file. */
258     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
259     double    interval, pre_int, post_int;  /* interval between this and
260                                              * the last run */
261     unsigned long proc_pdp_st;  /* which pdp_st was the last
262                                  * to be processed */
263     unsigned long occu_pdp_st;  /* when was the pdp_st
264                                  * before the last update
265                                  * time */
266     unsigned long proc_pdp_age; /* how old was the data in
267                                  * the pdp prep area when it
268                                  * was last updated */
269     unsigned long occu_pdp_age; /* how long ago was the last
270                                  * pdp_step time */
271     rrd_value_t *pdp_new;   /* prepare the incoming data
272                              * to be added the the
273                              * existing entry */
274     rrd_value_t *pdp_temp;  /* prepare the pdp values
275                              * to be added the the
276                              * cdp values */
277
278     long     *tmpl_idx; /* index representing the settings
279                            transported by the tmplt index */
280     unsigned long tmpl_cnt = 2; /* time and data */
281
282     rrd_t     rrd;
283     time_t    current_time = 0;
284     time_t    rra_time = 0; /* time of update for a RRA */
285     unsigned long current_time_usec = 0;    /* microseconds part of current time */
286     struct timeval tmp_time;    /* used for time conversion */
287
288     char    **updvals;
289     int       schedule_smooth = 0;
290     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
291
292     /* a vector of future Holt-Winters seasonal coefs */
293     unsigned long elapsed_pdp_st;
294
295     /* number of elapsed PDP steps since last update */
296     unsigned long *rra_step_cnt = NULL;
297
298     /* number of rows to be updated in an RRA for a data
299      * value. */
300     unsigned long start_pdp_offset;
301
302     /* number of PDP steps since the last update that
303      * are assigned to the first CDP to be generated
304      * since the last update. */
305     unsigned short scratch_idx;
306
307     /* index into the CDP scratch array */
308     enum cf_en current_cf;
309
310     /* numeric id of the current consolidation function */
311     rpnstack_t rpnstack;    /* used for COMPUTE DS */
312     int       version;  /* rrd version */
313     char     *endptr;   /* used in the conversion */
314     rrd_file_t *rrd_file;
315
316     rpnstack_init(&rpnstack);
317
318     /* need at least 1 arguments: data. */
319     if (argc < 1) {
320         rrd_set_error("Not enough arguments");
321         goto err_out;
322     }
323
324     rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
325     if (rrd_file == NULL) {
326         goto err_free;
327     }
328     /* We are now at the beginning of the rra's */
329     rra_current = rra_start = rra_begin = rrd_file->header_len;
330
331     /* initialize time */
332     version = atoi(rrd.stat_head->version);
333     gettimeofday(&tmp_time, 0);
334     normalize_time(&tmp_time);
335     current_time = tmp_time.tv_sec;
336     if (version >= 3) {
337         current_time_usec = tmp_time.tv_usec;
338     } else {
339         current_time_usec = 0;
340     }
341
342     /* get exclusive lock to whole file.
343      * lock gets removed when we close the file.
344      */
345     if (LockRRD(rrd_file->fd) != 0) {
346         rrd_set_error("could not lock RRD");
347         goto err_close;
348     }
349
350     if ((updvals =
351          malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
352         rrd_set_error("allocating updvals pointer array");
353         goto err_close;
354     }
355
356     if ((pdp_temp = malloc(sizeof(rrd_value_t)
357                            * rrd.stat_head->ds_cnt)) == NULL) {
358         rrd_set_error("allocating pdp_temp ...");
359         goto err_free_updvals;
360     }
361
362     if ((tmpl_idx = malloc(sizeof(unsigned long)
363                            * (rrd.stat_head->ds_cnt + 1))) == NULL) {
364         rrd_set_error("allocating tmpl_idx ...");
365         goto err_free_pdp_temp;
366     }
367     /* initialize tmplt redirector */
368     /* default config example (assume DS 1 is a CDEF DS)
369        tmpl_idx[0] -> 0; (time)
370        tmpl_idx[1] -> 1; (DS 0)
371        tmpl_idx[2] -> 3; (DS 2)
372        tmpl_idx[3] -> 4; (DS 3) */
373     tmpl_idx[0] = 0;    /* time */
374     for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
375         if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
376             tmpl_idx[ii++] = i;
377     }
378     tmpl_cnt = ii;
379
380     if (tmplt) {
381         /* we should work on a writeable copy here */
382         char     *dsname;
383         unsigned int tmpl_len;
384         char     *tmplt_copy = strdup(tmplt);
385
386         dsname = tmplt_copy;
387         tmpl_cnt = 1;   /* the first entry is the time */
388         tmpl_len = strlen(tmplt_copy);
389         for (i = 0; i <= tmpl_len; i++) {
390             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
391                 tmplt_copy[i] = '\0';
392                 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
393                     rrd_set_error
394                         ("tmplt contains more DS definitions than RRD");
395                     goto err_free_tmpl_idx;
396                 }
397                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
398                     rrd_set_error("unknown DS name '%s'", dsname);
399                     goto err_free_tmpl_idx;
400                 } else {
401                     /* the first element is always the time */
402                     tmpl_idx[tmpl_cnt - 1]++;
403                     /* go to the next entry on the tmplt_copy */
404                     dsname = &tmplt_copy[i + 1];
405                     /* fix the damage we did before */
406                     if (i < tmpl_len) {
407                         tmplt_copy[i] = ':';
408                     }
409
410                 }
411             }
412         }
413         free(tmplt_copy);
414     }
415     if ((pdp_new = malloc(sizeof(rrd_value_t)
416                           * rrd.stat_head->ds_cnt)) == NULL) {
417         rrd_set_error("allocating pdp_new ...");
418         goto err_free_tmpl_idx;
419     }
420     /* loop through the arguments. */
421     for (arg_i = 0; arg_i < argc; arg_i++) {
422         char     *stepper = strdup(argv[arg_i]);
423         char     *step_start = stepper;
424         char     *p;
425         char     *parsetime_error = NULL;
426         enum { atstyle, normal } timesyntax;
427         struct rrd_time_value ds_tv;
428
429         if (stepper == NULL) {
430             rrd_set_error("failed duplication argv entry");
431             free(step_start);
432             goto err_free_pdp_new;
433         }
434         /* initialize all ds input to unknown except the first one
435            which has always got to be set */
436         for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
437             updvals[ii] = "U";
438         updvals[0] = stepper;
439         /* separate all ds elements; first must be examined separately
440            due to alternate time syntax */
441         if ((p = strchr(stepper, '@')) != NULL) {
442             timesyntax = atstyle;
443             *p = '\0';
444             stepper = p + 1;
445         } else if ((p = strchr(stepper, ':')) != NULL) {
446             timesyntax = normal;
447             *p = '\0';
448             stepper = p + 1;
449         } else {
450             rrd_set_error
451                 ("expected timestamp not found in data source from %s",
452                  argv[arg_i]);
453             free(step_start);
454             break;
455         }
456         ii = 1;
457         updvals[tmpl_idx[ii]] = stepper;
458         while (*stepper) {
459             if (*stepper == ':') {
460                 *stepper = '\0';
461                 ii++;
462                 if (ii < tmpl_cnt) {
463                     updvals[tmpl_idx[ii]] = stepper + 1;
464                 }
465             }
466             stepper++;
467         }
468
469         if (ii != tmpl_cnt - 1) {
470             rrd_set_error
471                 ("expected %lu data source readings (got %lu) from %s",
472                  tmpl_cnt - 1, ii, argv[arg_i]);
473             free(step_start);
474             break;
475         }
476
477         /* get the time from the reading ... handle N */
478         if (timesyntax == atstyle) {
479             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
480                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
481                 free(step_start);
482                 break;
483             }
484             if (ds_tv.type == RELATIVE_TO_END_TIME ||
485                 ds_tv.type == RELATIVE_TO_START_TIME) {
486                 rrd_set_error("specifying time relative to the 'start' "
487                               "or 'end' makes no sense here: %s", updvals[0]);
488                 free(step_start);
489                 break;
490             }
491
492             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
493
494             current_time_usec = 0;  /* FIXME: how to handle usecs here ? */
495
496         } else if (strcmp(updvals[0], "N") == 0) {
497             gettimeofday(&tmp_time, 0);
498             normalize_time(&tmp_time);
499             current_time = tmp_time.tv_sec;
500             current_time_usec = tmp_time.tv_usec;
501         } else {
502             double    tmp;
503
504             tmp = strtod(updvals[0], 0);
505             current_time = floor(tmp);
506             current_time_usec =
507                 (long) ((tmp - (double) current_time) * 1000000.0);
508         }
509         /* dont do any correction for old version RRDs */
510         if (version < 3)
511             current_time_usec = 0;
512
513         if (current_time < rrd.live_head->last_up ||
514             (current_time == rrd.live_head->last_up &&
515              (long) current_time_usec <=
516              (long) rrd.live_head->last_up_usec)) {
517             rrd_set_error("illegal attempt to update using time %ld when "
518                           "last update time is %ld (minimum one second step)",
519                           current_time, rrd.live_head->last_up);
520             free(step_start);
521             break;
522         }
523
524         /* seek to the beginning of the rra's */
525         if (rra_current != rra_begin) {
526 #ifndef HAVE_MMAP
527             if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
528                 rrd_set_error("seek error in rrd");
529                 free(step_start);
530                 break;
531             }
532 #endif
533             rra_current = rra_begin;
534         }
535         rra_start = rra_begin;
536
537         /* when was the current pdp started */
538         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
539         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
540
541         /* when did the last pdp_st occur */
542         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
543         occu_pdp_st = current_time - occu_pdp_age;
544
545         /* interval = current_time - rrd.live_head->last_up; */
546         interval = (double) (current_time - rrd.live_head->last_up)
547             + (double) ((long) current_time_usec -
548                         (long) rrd.live_head->last_up_usec) / 1000000.0;
549
550         if (occu_pdp_st > proc_pdp_st) {
551             /* OK we passed the pdp_st moment */
552             pre_int = (long) occu_pdp_st - rrd.live_head->last_up;  /* how much of the input data
553                                                                      * occurred before the latest
554                                                                      * pdp_st moment*/
555             pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0;  /* adjust usecs */
556             post_int = occu_pdp_age;    /* how much after it */
557             post_int += ((double) current_time_usec) / 1000000.0;   /* adjust usecs */
558         } else {
559             pre_int = interval;
560             post_int = 0;
561         }
562
563 #ifdef DEBUG
564         printf("proc_pdp_age %lu\t"
565                "proc_pdp_st %lu\t"
566                "occu_pfp_age %lu\t"
567                "occu_pdp_st %lu\t"
568                "int %lf\t"
569                "pre_int %lf\t"
570                "post_int %lf\n", proc_pdp_age, proc_pdp_st,
571                occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
572 #endif
573
574         /* process the data sources and update the pdp_prep 
575          * area accordingly */
576         for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
577             enum dst_en dst_idx;
578
579             dst_idx = dst_conv(rrd.ds_def[i].dst);
580
581             /* make sure we do not build diffs with old last_ds values */
582             if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
583                 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
584                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
585             }
586
587             /* NOTE: DST_CDEF should never enter this if block, because
588              * updvals[i+1][0] is initialized to 'U'; unless the caller
589              * accidently specified a value for the DST_CDEF. To handle
590              * this case, an extra check is required. */
591
592             if ((updvals[i + 1][0] != 'U') &&
593                 (dst_idx != DST_CDEF) &&
594                 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
595                 double    rate = DNAN;
596
597                 /* the data source type defines how to process the data */
598                 /* pdp_new contains rate * time ... eg the bytes
599                  * transferred during the interval. Doing it this way saves
600                  * a lot of math operations */
601                 switch (dst_idx) {
602                 case DST_COUNTER:
603                 case DST_DERIVE:
604                     if (rrd.pdp_prep[i].last_ds[0] != 'U') {
605                         for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
606                             if ((updvals[i + 1][ii] < '0'
607                                  || updvals[i + 1][ii] > '9') && (ii != 0
608                                                                   && updvals[i
609                                                                              +
610                                                                              1]
611                                                                   [ii] !=
612                                                                   '-')) {
613                                 rrd_set_error("not a simple integer: '%s'",
614                                               updvals[i + 1]);
615                                 break;
616                             }
617                         }
618                         if (rrd_test_error()) {
619                             break;
620                         }
621                         pdp_new[i] =
622                             rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
623                         if (dst_idx == DST_COUNTER) {
624                             /* simple overflow catcher suggested by Andres Kroonmaa */
625                             /* this will fail terribly for non 32 or 64 bit counters ... */
626                             /* are there any others in SNMP land ? */
627                             if (pdp_new[i] < (double) 0.0)
628                                 pdp_new[i] += (double) 4294967296.0;    /* 2^32 */
629                             if (pdp_new[i] < (double) 0.0)
630                                 pdp_new[i] += (double) 18446744069414584320.0;
631                             /* 2^64-2^32 */ ;
632                         }
633                         rate = pdp_new[i] / interval;
634                     } else {
635                         pdp_new[i] = DNAN;
636                     }
637                     break;
638                 case DST_ABSOLUTE:
639                     errno = 0;
640                     pdp_new[i] = strtod(updvals[i + 1], &endptr);
641                     if (errno > 0) {
642                         rrd_set_error("converting '%s' to float: %s",
643                                       updvals[i + 1], rrd_strerror(errno));
644                         break;
645                     };
646                     if (endptr[0] != '\0') {
647                         rrd_set_error
648                             ("conversion of '%s' to float not complete: tail '%s'",
649                              updvals[i + 1], endptr);
650                         break;
651                     }
652                     rate = pdp_new[i] / interval;
653                     break;
654                 case DST_GAUGE:
655                     errno = 0;
656                     pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
657                     if (errno > 0) {
658                         rrd_set_error("converting '%s' to float: %s",
659                                       updvals[i + 1], rrd_strerror(errno));
660                         break;
661                     };
662                     if (endptr[0] != '\0') {
663                         rrd_set_error
664                             ("conversion of '%s' to float not complete: tail '%s'",
665                              updvals[i + 1], endptr);
666                         break;
667                     }
668                     rate = pdp_new[i] / interval;
669                     break;
670                 default:
671                     rrd_set_error("rrd contains unknown DS type : '%s'",
672                                   rrd.ds_def[i].dst);
673                     break;
674                 }
675                 /* break out of this for loop if the error string is set */
676                 if (rrd_test_error()) {
677                     break;
678                 }
679                 /* make sure pdp_temp is neither too large or too small
680                  * if any of these occur it becomes unknown ...
681                  * sorry folks ... */
682                 if (!isnan(rate) &&
683                     ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
684                       rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
685                      (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
686                       rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
687                     pdp_new[i] = DNAN;
688                 }
689             } else {
690                 /* no news is news all the same */
691                 pdp_new[i] = DNAN;
692             }
693
694
695             /* make a copy of the command line argument for the next run */
696 #ifdef DEBUG
697             fprintf(stderr,
698                     "prep ds[%lu]\t"
699                     "last_arg '%s'\t"
700                     "this_arg '%s'\t"
701                     "pdp_new %10.2f\n",
702                     i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
703 #endif
704             strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
705             rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
706         }
707         /* break out of the argument parsing loop if the error_string is set */
708         if (rrd_test_error()) {
709             free(step_start);
710             break;
711         }
712         /* has a pdp_st moment occurred since the last run ? */
713
714         if (proc_pdp_st == occu_pdp_st) {
715             /* no we have not passed a pdp_st moment. therefore update is simple */
716
717             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
718                 if (isnan(pdp_new[i])) {
719                     /* this is not realy accurate if we use subsecond data arival time
720                        should have thought of it when going subsecond resolution ...
721                        sorry next format change we will have it! */
722                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
723                         floor(interval);
724                 } else {
725                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
726                         rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
727                     } else {
728                         rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
729                     }
730                 }
731 #ifdef DEBUG
732                 fprintf(stderr,
733                         "NO PDP  ds[%lu]\t"
734                         "value %10.2f\t"
735                         "unkn_sec %5lu\n",
736                         i,
737                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
738                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
739 #endif
740             }
741         } else {
742             /* an pdp_st has occurred. */
743
744             /* in pdp_prep[].scratch[PDP_val].u_val we have collected
745                rate*seconds which occurred up to the last run.
746                pdp_new[] contains rate*seconds from the latest run.
747                pdp_temp[] will contain the rate for cdp */
748
749             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
750                 /* update pdp_prep to the current pdp_st. */
751                 double    pre_unknown = 0.0;
752
753                 if (isnan(pdp_new[i])) {
754                     /* a final bit of unkonwn to be added bevore calculation
755                        we use a temporary variable for this so that we
756                        don't have to turn integer lines before using the value */
757                     pre_unknown = pre_int;
758                 } else {
759                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
760                         rrd.pdp_prep[i].scratch[PDP_val].u_val =
761                             pdp_new[i] / interval * pre_int;
762                     } else {
763                         rrd.pdp_prep[i].scratch[PDP_val].u_val +=
764                             pdp_new[i] / interval * pre_int;
765                     }
766                 }
767
768
769                 /* if too much of the pdp_prep is unknown we dump it */
770                 if (
771                        /* removed because this does not agree with the
772                           definition that a heartbeat can be unknown */
773                        /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
774                           > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
775                        /* if the interval is larger thatn mrhb we get NAN */
776                        (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
777                        (occu_pdp_st - proc_pdp_st <=
778                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
779                     pdp_temp[i] = DNAN;
780                 } else {
781                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
782                         / ((double) (occu_pdp_st - proc_pdp_st
783                                      -
784                                      rrd.pdp_prep[i].
785                                      scratch[PDP_unkn_sec_cnt].u_cnt)
786                            - pre_unknown);
787                 }
788
789                 /* process CDEF data sources; remember each CDEF DS can
790                  * only reference other DS with a lower index number */
791                 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
792                     rpnp_t   *rpnp;
793
794                     rpnp =
795                         rpn_expand((rpn_cdefds_t *) &
796                                    (rrd.ds_def[i].par[DS_cdef]));
797                     /* substitue data values for OP_VARIABLE nodes */
798                     for (ii = 0; rpnp[ii].op != OP_END; ii++) {
799                         if (rpnp[ii].op == OP_VARIABLE) {
800                             rpnp[ii].op = OP_NUMBER;
801                             rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
802                         }
803                     }
804                     /* run the rpn calculator */
805                     if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
806                         free(rpnp);
807                         break;  /* exits the data sources pdp_temp loop */
808                     }
809                 }
810
811                 /* make pdp_prep ready for the next run */
812                 if (isnan(pdp_new[i])) {
813                     /* this is not realy accurate if we use subsecond data arival time
814                        should have thought of it when going subsecond resolution ...
815                        sorry next format change we will have it! */
816                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
817                         floor(post_int);
818                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
819                 } else {
820                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
821                     rrd.pdp_prep[i].scratch[PDP_val].u_val =
822                         pdp_new[i] / interval * post_int;
823                 }
824
825 #ifdef DEBUG
826                 fprintf(stderr,
827                         "PDP UPD ds[%lu]\t"
828                         "pdp_temp %10.2f\t"
829                         "new_prep %10.2f\t"
830                         "new_unkn_sec %5lu\n",
831                         i, pdp_temp[i],
832                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
833                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
834 #endif
835             }
836
837             /* if there were errors during the last loop, bail out here */
838             if (rrd_test_error()) {
839                 free(step_start);
840                 break;
841             }
842
843             /* compute the number of elapsed pdp_st moments */
844             elapsed_pdp_st =
845                 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
846 #ifdef DEBUG
847             fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
848 #endif
849             if (rra_step_cnt == NULL) {
850                 rra_step_cnt = (unsigned long *)
851                     malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
852             }
853
854             for (i = 0, rra_start = rra_begin;
855                  i < rrd.stat_head->rra_cnt;
856                  rra_start +=
857                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
858                  sizeof(rrd_value_t), i++) {
859                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
860                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
861                     (proc_pdp_st / rrd.stat_head->pdp_step) %
862                     rrd.rra_def[i].pdp_cnt;
863                 if (start_pdp_offset <= elapsed_pdp_st) {
864                     rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
865                         rrd.rra_def[i].pdp_cnt + 1;
866                 } else {
867                     rra_step_cnt[i] = 0;
868                 }
869
870                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
871                     /* If this is a bulk update, we need to skip ahead in
872                        the seasonal arrays so that they will be correct for
873                        the next observed value;
874                        note that for the bulk update itself, no update will
875                        occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
876                        and DEVPREDICT will be set to DNAN. */
877                     if (rra_step_cnt[i] > 2) {
878                         /* skip update by resetting rra_step_cnt[i],
879                            note that this is not data source specific; this is
880                            due to the bulk update, not a DNAN value for the
881                            specific data source. */
882                         rra_step_cnt[i] = 0;
883                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
884                                         elapsed_pdp_st, &last_seasonal_coef);
885                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
886                                         elapsed_pdp_st + 1, &seasonal_coef);
887                     }
888
889                     /* periodically run a smoother for seasonal effects */
890                     /* Need to use first cdp parameter buffer to track
891                      * burnin (burnin requires a specific smoothing schedule).
892                      * The CDP_init_seasonal parameter is really an RRA level,
893                      * not a data source within RRA level parameter, but the rra_def
894                      * is read only for rrd_update (not flushed to disk). */
895                     iii = i * (rrd.stat_head->ds_cnt);
896                     if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
897                         <= BURNIN_CYCLES) {
898                         if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
899                             > rrd.rra_def[i].row_cnt - 1) {
900                             /* mark off one of the burnin cycles */
901                             ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
902                                u_cnt);
903                             schedule_smooth = 1;
904                         }
905                     } else {
906                         /* someone has no doubt invented a trick to deal with this
907                          * wrap around, but at least this code is clear. */
908                         if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
909                             u_cnt > rrd.rra_ptr[i].cur_row) {
910                             /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
911                              * mapping between PDP and CDP */
912                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
913                                 >=
914                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
915                                 u_cnt) {
916 #ifdef DEBUG
917                                 fprintf(stderr,
918                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
919                                         rrd.rra_ptr[i].cur_row,
920                                         elapsed_pdp_st,
921                                         rrd.rra_def[i].
922                                         par[RRA_seasonal_smooth_idx].u_cnt);
923 #endif
924                                 schedule_smooth = 1;
925                             }
926                         } else {
927                             /* can't rely on negative numbers because we are working with
928                              * unsigned values */
929                             /* Don't need modulus here. If we've wrapped more than once, only
930                              * one smooth is executed at the end. */
931                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
932                                 rrd.rra_def[i].row_cnt
933                                 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
934                                 rrd.rra_def[i].row_cnt >=
935                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
936                                 u_cnt) {
937 #ifdef DEBUG
938                                 fprintf(stderr,
939                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
940                                         rrd.rra_ptr[i].cur_row,
941                                         elapsed_pdp_st,
942                                         rrd.rra_def[i].
943                                         par[RRA_seasonal_smooth_idx].u_cnt);
944 #endif
945                                 schedule_smooth = 1;
946                             }
947                         }
948                     }
949
950                     rra_current = rrd_tell(rrd_file);
951                 }
952                 /* if cf is DEVSEASONAL or SEASONAL */
953                 if (rrd_test_error())
954                     break;
955
956                 /* update CDP_PREP areas */
957                 /* loop over data soures within each RRA */
958                 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
959
960                     /* iii indexes the CDP prep area for this data source within the RRA */
961                     iii = i * rrd.stat_head->ds_cnt + ii;
962
963                     if (rrd.rra_def[i].pdp_cnt > 1) {
964
965                         if (rra_step_cnt[i] > 0) {
966                             /* If we are in this block, as least 1 CDP value will be written to
967                              * disk, this is the CDP_primary_val entry. If more than 1 value needs
968                              * to be written, then the "fill in" value is the CDP_secondary_val
969                              * entry. */
970                             if (isnan(pdp_temp[ii])) {
971                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
972                                     u_cnt += start_pdp_offset;
973                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
974                                     u_val = DNAN;
975                             } else {
976                                 /* CDP_secondary value is the RRA "fill in" value for intermediary
977                                  * CDP data entries. No matter the CF, the value is the same because
978                                  * the average, max, min, and last of a list of identical values is
979                                  * the same, namely, the value itself. */
980                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
981                                     u_val = pdp_temp[ii];
982                             }
983
984                             if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
985                                 u_cnt >
986                                 rrd.rra_def[i].pdp_cnt *
987                                 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
988                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
989                                     u_val = DNAN;
990                                 /* initialize carry over */
991                                 if (current_cf == CF_AVERAGE) {
992                                     if (isnan(pdp_temp[ii])) {
993                                         rrd.cdp_prep[iii].scratch[CDP_val].
994                                             u_val = DNAN;
995                                     } else {
996                                         rrd.cdp_prep[iii].scratch[CDP_val].
997                                             u_val =
998                                             pdp_temp[ii] *
999                                             ((elapsed_pdp_st -
1000                                               start_pdp_offset) %
1001                                              rrd.rra_def[i].pdp_cnt);
1002                                     }
1003                                 } else {
1004                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1005                                         pdp_temp[ii];
1006                                 }
1007                             } else {
1008                                 rrd_value_t cum_val, cur_val;
1009
1010                                 switch (current_cf) {
1011                                 case CF_AVERAGE:
1012                                     cum_val =
1013                                         IFDNAN(rrd.cdp_prep[iii].
1014                                                scratch[CDP_val].u_val, 0.0);
1015                                     cur_val = IFDNAN(pdp_temp[ii], 0.0);
1016                                     rrd.cdp_prep[iii].
1017                                         scratch[CDP_primary_val].u_val =
1018                                         (cum_val +
1019                                          cur_val * start_pdp_offset) /
1020                                         (rrd.rra_def[i].pdp_cnt -
1021                                          rrd.cdp_prep[iii].
1022                                          scratch[CDP_unkn_pdp_cnt].u_cnt);
1023                                     /* initialize carry over value */
1024                                     if (isnan(pdp_temp[ii])) {
1025                                         rrd.cdp_prep[iii].scratch[CDP_val].
1026                                             u_val = DNAN;
1027                                     } else {
1028                                         rrd.cdp_prep[iii].scratch[CDP_val].
1029                                             u_val =
1030                                             pdp_temp[ii] *
1031                                             ((elapsed_pdp_st -
1032                                               start_pdp_offset) %
1033                                              rrd.rra_def[i].pdp_cnt);
1034                                     }
1035                                     break;
1036                                 case CF_MAXIMUM:
1037                                     cum_val =
1038                                         IFDNAN(rrd.cdp_prep[iii].
1039                                                scratch[CDP_val].u_val, -DINF);
1040                                     cur_val = IFDNAN(pdp_temp[ii], -DINF);
1041 #ifdef DEBUG
1042                                     if (isnan
1043                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1044                                          u_val) && isnan(pdp_temp[ii])) {
1045                                         fprintf(stderr,
1046                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1047                                                 i, ii);
1048                                         exit(-1);
1049                                     }
1050 #endif
1051                                     if (cur_val > cum_val)
1052                                         rrd.cdp_prep[iii].
1053                                             scratch[CDP_primary_val].u_val =
1054                                             cur_val;
1055                                     else
1056                                         rrd.cdp_prep[iii].
1057                                             scratch[CDP_primary_val].u_val =
1058                                             cum_val;
1059                                     /* initialize carry over value */
1060                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1061                                         pdp_temp[ii];
1062                                     break;
1063                                 case CF_MINIMUM:
1064                                     cum_val =
1065                                         IFDNAN(rrd.cdp_prep[iii].
1066                                                scratch[CDP_val].u_val, DINF);
1067                                     cur_val = IFDNAN(pdp_temp[ii], DINF);
1068 #ifdef DEBUG
1069                                     if (isnan
1070                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1071                                          u_val) && isnan(pdp_temp[ii])) {
1072                                         fprintf(stderr,
1073                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1074                                                 i, ii);
1075                                         exit(-1);
1076                                     }
1077 #endif
1078                                     if (cur_val < cum_val)
1079                                         rrd.cdp_prep[iii].
1080                                             scratch[CDP_primary_val].u_val =
1081                                             cur_val;
1082                                     else
1083                                         rrd.cdp_prep[iii].
1084                                             scratch[CDP_primary_val].u_val =
1085                                             cum_val;
1086                                     /* initialize carry over value */
1087                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1088                                         pdp_temp[ii];
1089                                     break;
1090                                 case CF_LAST:
1091                                 default:
1092                                     rrd.cdp_prep[iii].
1093                                         scratch[CDP_primary_val].u_val =
1094                                         pdp_temp[ii];
1095                                     /* initialize carry over value */
1096                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1097                                         pdp_temp[ii];
1098                                     break;
1099                                 }
1100                             }   /* endif meets xff value requirement for a valid value */
1101                             /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1102                              * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1103                             if (isnan(pdp_temp[ii]))
1104                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1105                                     u_cnt =
1106                                     (elapsed_pdp_st -
1107                                      start_pdp_offset) %
1108                                     rrd.rra_def[i].pdp_cnt;
1109                             else
1110                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1111                                     u_cnt = 0;
1112                         } else {    /* rra_step_cnt[i]  == 0 */
1113
1114 #ifdef DEBUG
1115                             if (isnan
1116                                 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1117                                 fprintf(stderr,
1118                                         "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1119                                         i, ii);
1120                             } else {
1121                                 fprintf(stderr,
1122                                         "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1123                                         i, ii,
1124                                         rrd.cdp_prep[iii].scratch[CDP_val].
1125                                         u_val);
1126                             }
1127 #endif
1128                             if (isnan(pdp_temp[ii])) {
1129                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1130                                     u_cnt += elapsed_pdp_st;
1131                             } else
1132                                 if (isnan
1133                                     (rrd.cdp_prep[iii].scratch[CDP_val].
1134                                      u_val)) {
1135                                 if (current_cf == CF_AVERAGE) {
1136                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1137                                         pdp_temp[ii] * elapsed_pdp_st;
1138                                 } else {
1139                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1140                                         pdp_temp[ii];
1141                                 }
1142 #ifdef DEBUG
1143                                 fprintf(stderr,
1144                                         "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1145                                         i, ii,
1146                                         rrd.cdp_prep[iii].scratch[CDP_val].
1147                                         u_val);
1148 #endif
1149                             } else {
1150                                 switch (current_cf) {
1151                                 case CF_AVERAGE:
1152                                     rrd.cdp_prep[iii].scratch[CDP_val].
1153                                         u_val +=
1154                                         pdp_temp[ii] * elapsed_pdp_st;
1155                                     break;
1156                                 case CF_MINIMUM:
1157                                     if (pdp_temp[ii] <
1158                                         rrd.cdp_prep[iii].scratch[CDP_val].
1159                                         u_val)
1160                                         rrd.cdp_prep[iii].scratch[CDP_val].
1161                                             u_val = pdp_temp[ii];
1162                                     break;
1163                                 case CF_MAXIMUM:
1164                                     if (pdp_temp[ii] >
1165                                         rrd.cdp_prep[iii].scratch[CDP_val].
1166                                         u_val)
1167                                         rrd.cdp_prep[iii].scratch[CDP_val].
1168                                             u_val = pdp_temp[ii];
1169                                     break;
1170                                 case CF_LAST:
1171                                 default:
1172                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1173                                         pdp_temp[ii];
1174                                     break;
1175                                 }
1176                             }
1177                         }
1178                     } else {    /* rrd.rra_def[i].pdp_cnt == 1 */
1179                         if (elapsed_pdp_st > 2) {
1180                             switch (current_cf) {
1181                             case CF_AVERAGE:
1182                             default:
1183                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1184                                     u_val = pdp_temp[ii];
1185                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1186                                     u_val = pdp_temp[ii];
1187                                 break;
1188                             case CF_SEASONAL:
1189                             case CF_DEVSEASONAL:
1190                                 /* need to update cached seasonal values, so they are consistent
1191                                  * with the bulk update */
1192                                 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1193                                  * CDP_last_deviation are the same. */
1194                                 rrd.cdp_prep[iii].
1195                                     scratch[CDP_hw_last_seasonal].u_val =
1196                                     last_seasonal_coef[ii];
1197                                 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1198                                     u_val = seasonal_coef[ii];
1199                                 break;
1200                             case CF_HWPREDICT:
1201                                 /* need to update the null_count and last_null_count.
1202                                  * even do this for non-DNAN pdp_temp because the
1203                                  * algorithm is not learning from batch updates. */
1204                                 rrd.cdp_prep[iii].scratch[CDP_null_count].
1205                                     u_cnt += elapsed_pdp_st;
1206                                 rrd.cdp_prep[iii].
1207                                     scratch[CDP_last_null_count].u_cnt +=
1208                                     elapsed_pdp_st - 1;
1209                                 /* fall through */
1210                             case CF_DEVPREDICT:
1211                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1212                                     u_val = DNAN;
1213                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1214                                     u_val = DNAN;
1215                                 break;
1216                             case CF_FAILURES:
1217                                 /* do not count missed bulk values as failures */
1218                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1219                                     u_val = 0;
1220                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1221                                     u_val = 0;
1222                                 /* need to reset violations buffer.
1223                                  * could do this more carefully, but for now, just
1224                                  * assume a bulk update wipes away all violations. */
1225                                 erase_violations(&rrd, iii, i);
1226                                 break;
1227                             }
1228                         }
1229                     }   /* endif rrd.rra_def[i].pdp_cnt == 1 */
1230
1231                     if (rrd_test_error())
1232                         break;
1233
1234                 }       /* endif data sources loop */
1235             }           /* end RRA Loop */
1236
1237             /* this loop is only entered if elapsed_pdp_st < 3 */
1238             for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1239                  j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1240                 for (i = 0, rra_start = rra_begin;
1241                      i < rrd.stat_head->rra_cnt;
1242                      rra_start +=
1243                      rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1244                      sizeof(rrd_value_t), i++) {
1245                     if (rrd.rra_def[i].pdp_cnt > 1)
1246                         continue;
1247
1248                     current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1249                     if (current_cf == CF_SEASONAL
1250                         || current_cf == CF_DEVSEASONAL) {
1251                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
1252                                         elapsed_pdp_st + (scratch_idx ==
1253                                                           CDP_primary_val ? 1
1254                                                           : 2),
1255                                         &seasonal_coef);
1256                         rra_current = rrd_tell(rrd_file);
1257                     }
1258                     if (rrd_test_error())
1259                         break;
1260                     /* loop over data soures within each RRA */
1261                     for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1262                         update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1263                                            i * (rrd.stat_head->ds_cnt) + ii,
1264                                            i, ii, scratch_idx, seasonal_coef);
1265                     }
1266                 }       /* end RRA Loop */
1267                 if (rrd_test_error())
1268                     break;
1269             }           /* end elapsed_pdp_st loop */
1270
1271             if (rrd_test_error())
1272                 break;
1273
1274             /* Ready to write to disk */
1275             /* Move sequentially through the file, writing one RRA at a time.
1276              * Note this architecture divorces the computation of CDP with
1277              * flushing updated RRA entries to disk. */
1278             for (i = 0, rra_start = rra_begin;
1279                  i < rrd.stat_head->rra_cnt;
1280                  rra_start +=
1281                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1282                  sizeof(rrd_value_t), i++) {
1283                 /* is th5Aere anything to write for this RRA? If not, continue. */
1284                 if (rra_step_cnt[i] == 0)
1285                     continue;
1286
1287                 /* write the first row */
1288 #ifdef DEBUG
1289                 fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1290 #endif
1291                 rrd.rra_ptr[i].cur_row++;
1292                 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1293                     rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1294                 /* positition on the first row */
1295                 rra_pos_tmp = rra_start +
1296                     (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1297                     sizeof(rrd_value_t);
1298                 if (rra_pos_tmp != rra_current) {
1299                     if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1300                         rrd_set_error("seek error in rrd");
1301                         break;
1302                     }
1303                     rra_current = rra_pos_tmp;
1304                 }
1305 #ifdef DEBUG
1306                 fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1307 #endif
1308                 scratch_idx = CDP_primary_val;
1309                 if (pcdp_summary != NULL) {
1310                     rra_time = (current_time - current_time
1311                                 % (rrd.rra_def[i].pdp_cnt *
1312                                    rrd.stat_head->pdp_step))
1313                         -
1314                         ((rra_step_cnt[i] -
1315                           1) * rrd.rra_def[i].pdp_cnt *
1316                          rrd.stat_head->pdp_step);
1317                 }
1318                 pcdp_summary =
1319                     write_RRA_row(rrd_file, &rrd, i, &rra_current,
1320                                   scratch_idx, pcdp_summary, &rra_time);
1321                 if (rrd_test_error())
1322                     break;
1323
1324                 /* write other rows of the bulk update, if any */
1325                 scratch_idx = CDP_secondary_val;
1326                 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1327                     if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1328 #ifdef DEBUG
1329                         fprintf(stderr,
1330                                 "Wraparound for RRA %s, %lu updates left\n",
1331                                 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1332 #endif
1333                         /* wrap */
1334                         rrd.rra_ptr[i].cur_row = 0;
1335                         /* seek back to beginning of current rra */
1336                         if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1337                             rrd_set_error("seek error in rrd");
1338                             break;
1339                         }
1340 #ifdef DEBUG
1341                         fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1342                                 rrd_file->pos);
1343 #endif
1344                         rra_current = rra_start;
1345                     }
1346                     if (pcdp_summary != NULL) {
1347                         rra_time = (current_time - current_time
1348                                     % (rrd.rra_def[i].pdp_cnt *
1349                                        rrd.stat_head->pdp_step))
1350                             -
1351                             ((rra_step_cnt[i] -
1352                               2) * rrd.rra_def[i].pdp_cnt *
1353                              rrd.stat_head->pdp_step);
1354                     }
1355                     pcdp_summary =
1356                         write_RRA_row(rrd_file, &rrd, i, &rra_current,
1357                                       scratch_idx, pcdp_summary, &rra_time);
1358                 }
1359
1360                 if (rrd_test_error())
1361                     break;
1362             }           /* RRA LOOP */
1363
1364             /* break out of the argument parsing loop if error_string is set */
1365             if (rrd_test_error()) {
1366                 free(step_start);
1367                 break;
1368             }
1369
1370         }               /* endif a pdp_st has occurred */
1371         rrd.live_head->last_up = current_time;
1372         rrd.live_head->last_up_usec = current_time_usec;
1373         free(step_start);
1374     }                   /* function argument loop */
1375
1376     if (seasonal_coef != NULL)
1377         free(seasonal_coef);
1378     if (last_seasonal_coef != NULL)
1379         free(last_seasonal_coef);
1380     if (rra_step_cnt != NULL)
1381         free(rra_step_cnt);
1382     rpnstack_free(&rpnstack);
1383
1384 #if 0
1385     //rrd_flush(rrd_file);    //XXX: really needed?
1386 #endif
1387     /* if we got here and if there is an error and if the file has not been
1388      * written to, then close things up and return. */
1389     if (rrd_test_error()) {
1390         goto err_free_pdp_new;
1391     }
1392
1393     /* aargh ... that was tough ... so many loops ... anyway, its done.
1394      * we just need to write back the live header portion now*/
1395
1396     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1397                             + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1398                             + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1399                  SEEK_SET) != 0) {
1400         rrd_set_error("seek rrd for live header writeback");
1401         goto err_free_pdp_new;
1402     }
1403     /* for mmap, we did already write to the underlying mapping, so we do
1404        not need to write again.  */
1405 #ifndef HAVE_MMAP
1406     if (version >= 3) {
1407         if (rrd_write(rrd_file, rrd.live_head,
1408                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1409             rrd_set_error("rrd_write live_head to rrd");
1410             goto err_free_pdp_new;
1411         }
1412     } else {
1413         if (rrd_write(rrd_file, &rrd.live_head->last_up,
1414                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1415             rrd_set_error("rrd_write live_head to rrd");
1416             goto err_free_pdp_new;
1417         }
1418     }
1419
1420
1421     if (rrd_write(rrd_file, rrd.pdp_prep,
1422                   sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1423         != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1424         rrd_set_error("rrd_write pdp_prep to rrd");
1425         goto err_free_pdp_new;
1426     }
1427
1428     if (rrd_write(rrd_file, rrd.cdp_prep,
1429                   sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1430                   rrd.stat_head->ds_cnt)
1431         != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1432                       rrd.stat_head->ds_cnt)) {
1433
1434         rrd_set_error("rrd_write cdp_prep to rrd");
1435         goto err_free_pdp_new;
1436     }
1437
1438     if (rrd_write(rrd_file, rrd.rra_ptr,
1439                   sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1440         != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1441         rrd_set_error("rrd_write rra_ptr to rrd");
1442         goto err_free_pdp_new;
1443     }
1444 #endif
1445 #ifdef HAVE_POSIX_FADVISExxx
1446
1447     /* with update we have write ops, so they will probably not be done by now, this means
1448        the buffers will not get freed. But calling this for the whole file - header
1449        will let the data off the hook as soon as it is written when if it is from a previous
1450        update cycle. Calling fdsync to force things is much too hard here. */
1451
1452     if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1453         rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1454                       rrd_strerror(errno));
1455         goto err_free_pdp_new;
1456     }
1457 #endif
1458     /* rrd_flush(rrd_file); */
1459
1460     /* calling the smoothing code here guarantees at most
1461      * one smoothing operation per rrd_update call. Unfortunately,
1462      * it is possible with bulk updates, or a long-delayed update
1463      * for smoothing to occur off-schedule. This really isn't
1464      * critical except during the burning cycles. */
1465     if (schedule_smooth) {
1466
1467         rra_start = rra_begin;
1468         for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1469             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1470                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1471 #ifdef DEBUG
1472                 fprintf(stderr, "Running smoother for rra %ld\n", i);
1473 #endif
1474                 apply_smoother(&rrd, i, rra_start, rrd_file);
1475                 if (rrd_test_error())
1476                     break;
1477             }
1478             rra_start += rrd.rra_def[i].row_cnt
1479                 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1480         }
1481 #ifdef HAVE_POSIX_FADVISExxx
1482         /* same procedure as above ... */
1483         if (0 !=
1484             posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1485             rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1486                           rrd_strerror(errno));
1487             goto err_free_pdp_new;
1488         }
1489 #endif
1490     }
1491
1492     rrd_free(&rrd);
1493     rrd_close(rrd_file);
1494
1495     free(pdp_new);
1496     free(tmpl_idx);
1497     free(pdp_temp);
1498     free(updvals);
1499     return (0);
1500
1501   err_free_pdp_new:
1502     free(pdp_new);
1503   err_free_tmpl_idx:
1504     free(tmpl_idx);
1505   err_free_pdp_temp:
1506     free(pdp_temp);
1507   err_free_updvals:
1508     free(updvals);
1509   err_close:
1510     rrd_close(rrd_file);
1511   err_free:
1512     rrd_free(&rrd);
1513   err_out:
1514     return (-1);
1515 }
1516
1517 /*
1518  * get exclusive lock to whole file.
1519  * lock gets removed when we close the file
1520  *
1521  * returns 0 on success
1522  */
1523 int LockRRD(
1524     int in_file)
1525 {
1526     int       rcstat;
1527
1528     {
1529 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1530         struct _stat st;
1531
1532         if (_fstat(in_file, &st) == 0) {
1533             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1534         } else {
1535             rcstat = -1;
1536         }
1537 #else
1538         struct flock lock;
1539
1540         lock.l_type = F_WRLCK;  /* exclusive write lock */
1541         lock.l_len = 0; /* whole file */
1542         lock.l_start = 0;   /* start of file */
1543         lock.l_whence = SEEK_SET;   /* end of file */
1544
1545         rcstat = fcntl(in_file, F_SETLK, &lock);
1546 #endif
1547     }
1548
1549     return (rcstat);
1550 }