Bernhard Fischer:
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
16
17 #include "rrd_hw.h"
18 #include "rrd_rpncalc.h"
19
20 #include "rrd_is_thread_safe.h"
21 #include "unused.h"
22
23 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
24 /*
25  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
26  * replacement.
27  */
28 #include <sys/timeb.h>
29
30 #ifndef __MINGW32__
31 struct timeval {
32     time_t    tv_sec;   /* seconds */
33     long      tv_usec;  /* microseconds */
34 };
35 #endif
36
37 struct __timezone {
38     int       tz_minuteswest;   /* minutes W of Greenwich */
39     int       tz_dsttime;   /* type of dst correction */
40 };
41
42 static int gettimeofday(
43     struct timeval *t,
44     struct __timezone *tz)
45 {
46
47     struct _timeb current_time;
48
49     _ftime(&current_time);
50
51     t->tv_sec = current_time.time;
52     t->tv_usec = current_time.millitm * 1000;
53
54     return 0;
55 }
56
57 #endif
58 /*
59  * normalize time as returned by gettimeofday. usec part must
60  * be always >= 0
61  */
62 static inline void normalize_time(
63     struct timeval *t)
64 {
65     if (t->tv_usec < 0) {
66         t->tv_sec--;
67         t->tv_usec += 1000000L;
68     }
69 }
70
71 static inline info_t *write_RRA_row(
72     rrd_file_t *rrd_file,
73     rrd_t *rrd,
74     unsigned long rra_idx,
75     unsigned long *rra_current,
76     unsigned short CDP_scratch_idx,
77     info_t *pcdp_summary,
78     time_t *rra_time)
79 {
80     unsigned long ds_idx, cdp_idx;
81     infoval   iv;
82
83     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
84         /* compute the cdp index */
85         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
86 #ifdef DEBUG
87         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
88                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
89                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
90 #endif
91         if (pcdp_summary != NULL) {
92             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
93             /* append info to the return hash */
94             pcdp_summary = info_push(pcdp_summary,
95                                      sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
96                                                    *rra_time,
97                                                    rrd->rra_def[rra_idx].
98                                                    cf_nam,
99                                                    rrd->rra_def[rra_idx].
100                                                    pdp_cnt,
101                                                    rrd->ds_def[ds_idx].
102                                                    ds_nam), RD_I_VAL, iv);
103         }
104         if (rrd_write
105             (rrd_file,
106              &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
107              sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
108             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
109             return 0;
110         }
111         *rra_current += sizeof(rrd_value_t);
112     }
113     return (pcdp_summary);
114 }
115
116 int       rrd_update_r(
117     const char *filename,
118     const char *tmplt,
119     int argc,
120     const char **argv);
121 int       _rrd_update(
122     const char *filename,
123     const char *tmplt,
124     int argc,
125     const char **argv,
126     info_t *);
127
128 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
129
130
131 info_t   *rrd_update_v(
132     int argc,
133     char **argv)
134 {
135     char     *tmplt = NULL;
136     info_t   *result = NULL;
137     infoval   rc;
138     struct option long_options[] = {
139         {"template", required_argument, 0, 't'},
140         {0, 0, 0, 0}
141     };
142
143     rc.u_int = -1;
144     optind = 0;
145     opterr = 0;         /* initialize getopt */
146
147     while (1) {
148         int       option_index = 0;
149         int       opt;
150
151         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
152
153         if (opt == EOF)
154             break;
155
156         switch (opt) {
157         case 't':
158             tmplt = optarg;
159             break;
160
161         case '?':
162             rrd_set_error("unknown option '%s'", argv[optind - 1]);
163             goto end_tag;
164         }
165     }
166
167     /* need at least 2 arguments: filename, data. */
168     if (argc - optind < 2) {
169         rrd_set_error("Not enough arguments");
170         goto end_tag;
171     }
172     rc.u_int = 0;
173     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
174     rc.u_int = _rrd_update(argv[optind], tmplt,
175                            argc - optind - 1,
176                            (const char **) (argv + optind + 1), result);
177     result->value.u_int = rc.u_int;
178   end_tag:
179     return result;
180 }
181
182 int rrd_update(
183     int argc,
184     char **argv)
185 {
186     struct option long_options[] = {
187         {"template", required_argument, 0, 't'},
188         {0, 0, 0, 0}
189     };
190     int       option_index = 0;
191     int       opt;
192     char     *tmplt = NULL;
193     int       rc;
194
195     optind = 0;
196     opterr = 0;         /* initialize getopt */
197
198     while (1) {
199         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
200
201         if (opt == EOF)
202             break;
203
204         switch (opt) {
205         case 't':
206             tmplt = strdup(optarg);
207             break;
208
209         case '?':
210             rrd_set_error("unknown option '%s'", argv[optind - 1]);
211             return (-1);
212         }
213     }
214
215     /* need at least 2 arguments: filename, data. */
216     if (argc - optind < 2) {
217         rrd_set_error("Not enough arguments");
218
219         return -1;
220     }
221
222     rc = rrd_update_r(argv[optind], tmplt,
223                       argc - optind - 1, (const char **) (argv + optind + 1));
224     free(tmplt);
225     return rc;
226 }
227
228 int rrd_update_r(
229     const char *filename,
230     const char *tmplt,
231     int argc,
232     const char **argv)
233 {
234     return _rrd_update(filename, tmplt, argc, argv, NULL);
235 }
236
237 int _rrd_update(
238     const char *filename,
239     const char *tmplt,
240     int argc,
241     const char **argv,
242     info_t *pcdp_summary)
243 {
244
245     int       arg_i = 2;
246     short     j;
247     unsigned long i, ii, iii = 1;
248
249     unsigned long rra_begin;    /* byte pointer to the rra
250                                  * area in the rrd file.  this
251                                  * pointer never changes value */
252     unsigned long rra_start;    /* byte pointer to the rra
253                                  * area in the rrd file.  this
254                                  * pointer changes as each rrd is
255                                  * processed. */
256     unsigned long rra_current;  /* byte pointer to the current write
257                                  * spot in the rrd file. */
258     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
259     double    interval, pre_int, post_int;  /* interval between this and
260                                              * the last run */
261     unsigned long proc_pdp_st;  /* which pdp_st was the last
262                                  * to be processed */
263     unsigned long occu_pdp_st;  /* when was the pdp_st
264                                  * before the last update
265                                  * time */
266     unsigned long proc_pdp_age; /* how old was the data in
267                                  * the pdp prep area when it
268                                  * was last updated */
269     unsigned long occu_pdp_age; /* how long ago was the last
270                                  * pdp_step time */
271     rrd_value_t *pdp_new;   /* prepare the incoming data
272                              * to be added the the
273                              * existing entry */
274     rrd_value_t *pdp_temp;  /* prepare the pdp values
275                              * to be added the the
276                              * cdp values */
277
278     long     *tmpl_idx; /* index representing the settings
279                            transported by the tmplt index */
280     unsigned long tmpl_cnt = 2; /* time and data */
281
282     rrd_t     rrd;
283     time_t    current_time = 0;
284     time_t    rra_time = 0; /* time of update for a RRA */
285     unsigned long current_time_usec = 0;    /* microseconds part of current time */
286     struct timeval tmp_time;    /* used for time conversion */
287
288     char    **updvals;
289     int       schedule_smooth = 0;
290     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
291
292     /* a vector of future Holt-Winters seasonal coefs */
293     unsigned long elapsed_pdp_st;
294
295     /* number of elapsed PDP steps since last update */
296     unsigned long *rra_step_cnt = NULL;
297
298     /* number of rows to be updated in an RRA for a data
299      * value. */
300     unsigned long start_pdp_offset;
301
302     /* number of PDP steps since the last update that
303      * are assigned to the first CDP to be generated
304      * since the last update. */
305     unsigned short scratch_idx;
306
307     /* index into the CDP scratch array */
308     enum cf_en current_cf;
309
310     /* numeric id of the current consolidation function */
311     rpnstack_t rpnstack;    /* used for COMPUTE DS */
312     int       version;  /* rrd version */
313     char     *endptr;   /* used in the conversion */
314     rrd_file_t *rrd_file;
315
316     rpnstack_init(&rpnstack);
317
318     /* need at least 1 arguments: data. */
319     if (argc < 1) {
320         rrd_set_error("Not enough arguments");
321         goto err_out;
322     }
323
324     rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
325     if (rrd_file == NULL) {
326         goto err_free;
327     }
328     /* We are now at the beginning of the rra's */
329     rra_current = rra_start = rra_begin = rrd_file->header_len;
330
331     /* initialize time */
332     version = atoi(rrd.stat_head->version);
333     gettimeofday(&tmp_time, 0);
334     normalize_time(&tmp_time);
335     current_time = tmp_time.tv_sec;
336     if (version >= 3) {
337         current_time_usec = tmp_time.tv_usec;
338     } else {
339         current_time_usec = 0;
340     }
341
342     /* get exclusive lock to whole file.
343      * lock gets removed when we close the file.
344      */
345     if (LockRRD(rrd_file->fd) != 0) {
346         rrd_set_error("could not lock RRD");
347         goto err_close;
348     }
349
350     if ((updvals =
351          malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
352         rrd_set_error("allocating updvals pointer array");
353         goto err_close;
354     }
355
356     if ((pdp_temp = malloc(sizeof(rrd_value_t)
357                            * rrd.stat_head->ds_cnt)) == NULL) {
358         rrd_set_error("allocating pdp_temp ...");
359         goto err_free_updvals;
360     }
361
362     if ((tmpl_idx = malloc(sizeof(unsigned long)
363                            * (rrd.stat_head->ds_cnt + 1))) == NULL) {
364         rrd_set_error("allocating tmpl_idx ...");
365         goto err_free_pdp_temp;
366     }
367     /* initialize tmplt redirector */
368     /* default config example (assume DS 1 is a CDEF DS)
369        tmpl_idx[0] -> 0; (time)
370        tmpl_idx[1] -> 1; (DS 0)
371        tmpl_idx[2] -> 3; (DS 2)
372        tmpl_idx[3] -> 4; (DS 3) */
373     tmpl_idx[0] = 0;    /* time */
374     for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
375         if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
376             tmpl_idx[ii++] = i;
377     }
378     tmpl_cnt = ii;
379
380     if (tmplt) {
381         /* we should work on a writeable copy here */
382         char     *dsname;
383         unsigned int tmpl_len;
384         char     *tmplt_copy = strdup(tmplt);
385
386         dsname = tmplt_copy;
387         tmpl_cnt = 1;   /* the first entry is the time */
388         tmpl_len = strlen(tmplt_copy);
389         for (i = 0; i <= tmpl_len; i++) {
390             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
391                 tmplt_copy[i] = '\0';
392                 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
393                     rrd_set_error
394                         ("tmplt contains more DS definitions than RRD");
395                     goto err_free_tmpl_idx;
396                 }
397                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
398                     rrd_set_error("unknown DS name '%s'", dsname);
399                     goto err_free_tmpl_idx;
400                 } else {
401                     /* the first element is always the time */
402                     tmpl_idx[tmpl_cnt - 1]++;
403                     /* go to the next entry on the tmplt_copy */
404                     dsname = &tmplt_copy[i + 1];
405                     /* fix the damage we did before */
406                     if (i < tmpl_len) {
407                         tmplt_copy[i] = ':';
408                     }
409
410                 }
411             }
412         }
413         free(tmplt_copy);
414     }
415     if ((pdp_new = malloc(sizeof(rrd_value_t)
416                           * rrd.stat_head->ds_cnt)) == NULL) {
417         rrd_set_error("allocating pdp_new ...");
418         goto err_free_tmpl_idx;
419     }
420     /* loop through the arguments. */
421     for (arg_i = 0; arg_i < argc; arg_i++) {
422         char     *stepper = strdup(argv[arg_i]);
423         char     *step_start = stepper;
424         char     *p;
425         char     *parsetime_error = NULL;
426         enum { atstyle, normal } timesyntax;
427         struct rrd_time_value ds_tv;
428
429         if (stepper == NULL) {
430             rrd_set_error("failed duplication argv entry");
431             free(step_start);
432             goto err_free_pdp_new;
433         }
434         /* initialize all ds input to unknown except the first one
435            which has always got to be set */
436         for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
437             updvals[ii] = "U";
438         updvals[0] = stepper;
439         /* separate all ds elements; first must be examined separately
440            due to alternate time syntax */
441         if ((p = strchr(stepper, '@')) != NULL) {
442             timesyntax = atstyle;
443             *p = '\0';
444             stepper = p + 1;
445         } else if ((p = strchr(stepper, ':')) != NULL) {
446             timesyntax = normal;
447             *p = '\0';
448             stepper = p + 1;
449         } else {
450             rrd_set_error
451                 ("expected timestamp not found in data source from %s",
452                  argv[arg_i]);
453             free(step_start);
454             break;
455         }
456         ii = 1;
457         updvals[tmpl_idx[ii]] = stepper;
458         while (*stepper) {
459             if (*stepper == ':') {
460                 *stepper = '\0';
461                 ii++;
462                 if (ii < tmpl_cnt) {
463                     updvals[tmpl_idx[ii]] = stepper + 1;
464                 }
465             }
466             stepper++;
467         }
468
469         if (ii != tmpl_cnt - 1) {
470             rrd_set_error
471                 ("expected %lu data source readings (got %lu) from %s",
472                  tmpl_cnt - 1, ii, argv[arg_i]);
473             free(step_start);
474             break;
475         }
476
477         /* get the time from the reading ... handle N */
478         if (timesyntax == atstyle) {
479             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
480                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
481                 free(step_start);
482                 break;
483             }
484             if (ds_tv.type == RELATIVE_TO_END_TIME ||
485                 ds_tv.type == RELATIVE_TO_START_TIME) {
486                 rrd_set_error("specifying time relative to the 'start' "
487                               "or 'end' makes no sense here: %s", updvals[0]);
488                 free(step_start);
489                 break;
490             }
491
492             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
493
494             current_time_usec = 0;  /* FIXME: how to handle usecs here ? */
495
496         } else if (strcmp(updvals[0], "N") == 0) {
497             gettimeofday(&tmp_time, 0);
498             normalize_time(&tmp_time);
499             current_time = tmp_time.tv_sec;
500             current_time_usec = tmp_time.tv_usec;
501         } else {
502             double    tmp;
503
504             tmp = strtod(updvals[0], 0);
505             current_time = floor(tmp);
506             current_time_usec =
507                 (long) ((tmp - (double) current_time) * 1000000.0);
508         }
509         /* dont do any correction for old version RRDs */
510         if (version < 3)
511             current_time_usec = 0;
512
513         if (current_time < rrd.live_head->last_up ||
514             (current_time == rrd.live_head->last_up &&
515              (long) current_time_usec <=
516              (long) rrd.live_head->last_up_usec)) {
517             rrd_set_error("illegal attempt to update using time %ld when "
518                           "last update time is %ld (minimum one second step)",
519                           current_time, rrd.live_head->last_up);
520             free(step_start);
521             break;
522         }
523
524         /* seek to the beginning of the rra's */
525         if (rra_current != rra_begin) {
526 #ifndef HAVE_MMAP
527             if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
528                 rrd_set_error("seek error in rrd");
529                 free(step_start);
530                 break;
531             }
532 #endif
533             rra_current = rra_begin;
534         }
535         rra_start = rra_begin;
536
537         /* when was the current pdp started */
538         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
539         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
540
541         /* when did the last pdp_st occur */
542         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
543         occu_pdp_st = current_time - occu_pdp_age;
544
545         /* interval = current_time - rrd.live_head->last_up; */
546         interval = (double) (current_time - rrd.live_head->last_up)
547             + (double) ((long) current_time_usec -
548                         (long) rrd.live_head->last_up_usec) / 1000000.0;
549
550         if (occu_pdp_st > proc_pdp_st) {
551             /* OK we passed the pdp_st moment */
552             pre_int = (long) occu_pdp_st - rrd.live_head->last_up;  /* how much of the input data
553                                                                      * occurred before the latest
554                                                                      * pdp_st moment*/
555             pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0;  /* adjust usecs */
556             post_int = occu_pdp_age;    /* how much after it */
557             post_int += ((double) current_time_usec) / 1000000.0;   /* adjust usecs */
558         } else {
559             pre_int = interval;
560             post_int = 0;
561         }
562
563 #ifdef DEBUG
564         printf("proc_pdp_age %lu\t"
565                "proc_pdp_st %lu\t"
566                "occu_pfp_age %lu\t"
567                "occu_pdp_st %lu\t"
568                "int %lf\t"
569                "pre_int %lf\t"
570                "post_int %lf\n", proc_pdp_age, proc_pdp_st,
571                occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
572 #endif
573
574         /* process the data sources and update the pdp_prep 
575          * area accordingly */
576         for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
577             enum dst_en dst_idx;
578
579             dst_idx = dst_conv(rrd.ds_def[i].dst);
580
581             /* make sure we do not build diffs with old last_ds values */
582             if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
583                 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
584                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
585             }
586
587             /* NOTE: DST_CDEF should never enter this if block, because
588              * updvals[i+1][0] is initialized to 'U'; unless the caller
589              * accidently specified a value for the DST_CDEF. To handle
590              * this case, an extra check is required. */
591
592             if ((updvals[i + 1][0] != 'U') &&
593                 (dst_idx != DST_CDEF) &&
594                 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
595                 double    rate = DNAN;
596
597                 /* the data source type defines how to process the data */
598                 /* pdp_new contains rate * time ... eg the bytes
599                  * transferred during the interval. Doing it this way saves
600                  * a lot of math operations */
601                 switch (dst_idx) {
602                 case DST_COUNTER:
603                 case DST_DERIVE:
604                     if (rrd.pdp_prep[i].last_ds[0] != 'U') {
605                         for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
606                             if ((updvals[i + 1][ii] < '0'
607                                  || updvals[i + 1][ii] > '9') && (ii != 0
608                                                                   && updvals[i
609                                                                              +
610                                                                              1]
611                                                                   [ii] !=
612                                                                   '-')) {
613                                 rrd_set_error("not a simple integer: '%s'",
614                                               updvals[i + 1]);
615                                 break;
616                             }
617                         }
618                         if (rrd_test_error()) {
619                             break;
620                         }
621                         pdp_new[i] =
622                             rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
623                         if (dst_idx == DST_COUNTER) {
624                             /* simple overflow catcher suggested by Andres Kroonmaa */
625                             /* this will fail terribly for non 32 or 64 bit counters ... */
626                             /* are there any others in SNMP land ? */
627                             if (pdp_new[i] < (double) 0.0)
628                                 pdp_new[i] += (double) 4294967296.0;    /* 2^32 */
629                             if (pdp_new[i] < (double) 0.0)
630                                 pdp_new[i] += (double) 18446744069414584320.0;
631                             /* 2^64-2^32 */ ;
632                         }
633                         rate = pdp_new[i] / interval;
634                     } else {
635                         pdp_new[i] = DNAN;
636                     }
637                     break;
638                 case DST_ABSOLUTE:
639                     errno = 0;
640                     pdp_new[i] = strtod(updvals[i + 1], &endptr);
641                     if (errno > 0) {
642                         rrd_set_error("converting '%s' to float: %s",
643                                       updvals[i + 1], rrd_strerror(errno));
644                         break;
645                     };
646                     if (endptr[0] != '\0') {
647                         rrd_set_error
648                             ("conversion of '%s' to float not complete: tail '%s'",
649                              updvals[i + 1], endptr);
650                         break;
651                     }
652                     rate = pdp_new[i] / interval;
653                     break;
654                 case DST_GAUGE:
655                     errno = 0;
656                     pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
657                     if (errno > 0) {
658                         rrd_set_error("converting '%s' to float: %s",
659                                       updvals[i + 1], rrd_strerror(errno));
660                         break;
661                     };
662                     if (endptr[0] != '\0') {
663                         rrd_set_error
664                             ("conversion of '%s' to float not complete: tail '%s'",
665                              updvals[i + 1], endptr);
666                         break;
667                     }
668                     rate = pdp_new[i] / interval;
669                     break;
670                 default:
671                     rrd_set_error("rrd contains unknown DS type : '%s'",
672                                   rrd.ds_def[i].dst);
673                     break;
674                 }
675                 /* break out of this for loop if the error string is set */
676                 if (rrd_test_error()) {
677                     break;
678                 }
679                 /* make sure pdp_temp is neither too large or too small
680                  * if any of these occur it becomes unknown ...
681                  * sorry folks ... */
682                 if (!isnan(rate) &&
683                     ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
684                       rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
685                      (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
686                       rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
687                     pdp_new[i] = DNAN;
688                 }
689             } else {
690                 /* no news is news all the same */
691                 pdp_new[i] = DNAN;
692             }
693
694
695             /* make a copy of the command line argument for the next run */
696 #ifdef DEBUG
697             fprintf(stderr,
698                     "prep ds[%lu]\t"
699                     "last_arg '%s'\t"
700                     "this_arg '%s'\t"
701                     "pdp_new %10.2f\n",
702                     i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
703 #endif
704             strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
705             rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
706         }
707         /* break out of the argument parsing loop if the error_string is set */
708         if (rrd_test_error()) {
709             free(step_start);
710             break;
711         }
712         /* has a pdp_st moment occurred since the last run ? */
713
714         if (proc_pdp_st == occu_pdp_st) {
715             /* no we have not passed a pdp_st moment. therefore update is simple */
716
717             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
718                 if (isnan(pdp_new[i])) {
719                     /* this is not realy accurate if we use subsecond data arival time
720                        should have thought of it when going subsecond resolution ...
721                        sorry next format change we will have it! */
722                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
723                         floor(interval);
724                 } else {
725                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
726                         rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
727                     } else {
728                         rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
729                     }
730                 }
731 #ifdef DEBUG
732                 fprintf(stderr,
733                         "NO PDP  ds[%lu]\t"
734                         "value %10.2f\t"
735                         "unkn_sec %5lu\n",
736                         i,
737                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
738                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
739 #endif
740             }
741         } else {
742             /* an pdp_st has occurred. */
743
744             /* in pdp_prep[].scratch[PDP_val].u_val we have collected
745                rate*seconds which occurred up to the last run.
746                pdp_new[] contains rate*seconds from the latest run.
747                pdp_temp[] will contain the rate for cdp */
748
749             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
750                 /* update pdp_prep to the current pdp_st. */
751                 double    pre_unknown = 0.0;
752
753                 if (isnan(pdp_new[i])) {
754                     /* a final bit of unkonwn to be added bevore calculation
755                        we use a temporary variable for this so that we
756                        don't have to turn integer lines before using the value */
757                     pre_unknown = pre_int;
758                 } else {
759                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
760                         rrd.pdp_prep[i].scratch[PDP_val].u_val =
761                             pdp_new[i] / interval * pre_int;
762                     } else {
763                         rrd.pdp_prep[i].scratch[PDP_val].u_val +=
764                             pdp_new[i] / interval * pre_int;
765                     }
766                 }
767
768
769                 /* if too much of the pdp_prep is unknown we dump it */
770                 if (
771                        /* removed because this does not agree with the
772                           definition that a heartbeat can be unknown */
773                        /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
774                           > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
775                        /* if the interval is larger thatn mrhb we get NAN */
776                        (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
777                        (occu_pdp_st - proc_pdp_st <=
778                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
779                     pdp_temp[i] = DNAN;
780                 } else {
781                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
782                         / ((double) (occu_pdp_st - proc_pdp_st
783                                      -
784                                      rrd.pdp_prep[i].
785                                      scratch[PDP_unkn_sec_cnt].u_cnt)
786                            - pre_unknown);
787                 }
788
789                 /* process CDEF data sources; remember each CDEF DS can
790                  * only reference other DS with a lower index number */
791                 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
792                     rpnp_t   *rpnp;
793
794                     rpnp =
795                         rpn_expand((rpn_cdefds_t *) &
796                                    (rrd.ds_def[i].par[DS_cdef]));
797                     /* substitue data values for OP_VARIABLE nodes */
798                     for (ii = 0; rpnp[ii].op != OP_END; ii++) {
799                         if (rpnp[ii].op == OP_VARIABLE) {
800                             rpnp[ii].op = OP_NUMBER;
801                             rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
802                         }
803                     }
804                     /* run the rpn calculator */
805                     if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
806                         free(rpnp);
807                         break;  /* exits the data sources pdp_temp loop */
808                     }
809                 }
810
811                 /* make pdp_prep ready for the next run */
812                 if (isnan(pdp_new[i])) {
813                     /* this is not realy accurate if we use subsecond data arival time
814                        should have thought of it when going subsecond resolution ...
815                        sorry next format change we will have it! */
816                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
817                         floor(post_int);
818                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
819                 } else {
820                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
821                     rrd.pdp_prep[i].scratch[PDP_val].u_val =
822                         pdp_new[i] / interval * post_int;
823                 }
824
825 #ifdef DEBUG
826                 fprintf(stderr,
827                         "PDP UPD ds[%lu]\t"
828                         "pdp_temp %10.2f\t"
829                         "new_prep %10.2f\t"
830                         "new_unkn_sec %5lu\n",
831                         i, pdp_temp[i],
832                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
833                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
834 #endif
835             }
836
837             /* if there were errors during the last loop, bail out here */
838             if (rrd_test_error()) {
839                 free(step_start);
840                 break;
841             }
842
843             /* compute the number of elapsed pdp_st moments */
844             elapsed_pdp_st =
845                 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
846 #ifdef DEBUG
847             fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
848 #endif
849             if (rra_step_cnt == NULL) {
850                 rra_step_cnt = (unsigned long *)
851                     malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
852             }
853
854             for (i = 0, rra_start = rra_begin;
855                  i < rrd.stat_head->rra_cnt;
856                  rra_start +=
857                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
858                  sizeof(rrd_value_t), i++) {
859                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
860                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
861                     (proc_pdp_st / rrd.stat_head->pdp_step) %
862                     rrd.rra_def[i].pdp_cnt;
863                 if (start_pdp_offset <= elapsed_pdp_st) {
864                     rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
865                         rrd.rra_def[i].pdp_cnt + 1;
866                 } else {
867                     rra_step_cnt[i] = 0;
868                 }
869
870                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
871                     /* If this is a bulk update, we need to skip ahead in
872                        the seasonal arrays so that they will be correct for
873                        the next observed value;
874                        note that for the bulk update itself, no update will
875                        occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
876                        and DEVPREDICT will be set to DNAN. */
877                     if (rra_step_cnt[i] > 2) {
878                         /* skip update by resetting rra_step_cnt[i],
879                            note that this is not data source specific; this is
880                            due to the bulk update, not a DNAN value for the
881                            specific data source. */
882                         rra_step_cnt[i] = 0;
883                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
884                                         elapsed_pdp_st, &last_seasonal_coef);
885                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
886                                         elapsed_pdp_st + 1, &seasonal_coef);
887                     }
888
889                     /* periodically run a smoother for seasonal effects */
890                     /* Need to use first cdp parameter buffer to track
891                      * burnin (burnin requires a specific smoothing schedule).
892                      * The CDP_init_seasonal parameter is really an RRA level,
893                      * not a data source within RRA level parameter, but the rra_def
894                      * is read only for rrd_update (not flushed to disk). */
895                     iii = i * (rrd.stat_head->ds_cnt);
896                     if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
897                         <= BURNIN_CYCLES) {
898                         if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
899                             > rrd.rra_def[i].row_cnt - 1) {
900                             /* mark off one of the burnin cycles */
901                             ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
902                                u_cnt);
903                             schedule_smooth = 1;
904                         }
905                     } else {
906                         /* someone has no doubt invented a trick to deal with this
907                          * wrap around, but at least this code is clear. */
908                         if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
909                             u_cnt > rrd.rra_ptr[i].cur_row) {
910                             /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
911                              * mapping between PDP and CDP */
912                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
913                                 >=
914                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
915                                 u_cnt) {
916 #ifdef DEBUG
917                                 fprintf(stderr,
918                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
919                                         rrd.rra_ptr[i].cur_row,
920                                         elapsed_pdp_st,
921                                         rrd.rra_def[i].
922                                         par[RRA_seasonal_smooth_idx].u_cnt);
923 #endif
924                                 schedule_smooth = 1;
925                             }
926                         } else {
927                             /* can't rely on negative numbers because we are working with
928                              * unsigned values */
929                             /* Don't need modulus here. If we've wrapped more than once, only
930                              * one smooth is executed at the end. */
931                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
932                                 rrd.rra_def[i].row_cnt
933                                 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
934                                 rrd.rra_def[i].row_cnt >=
935                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
936                                 u_cnt) {
937 #ifdef DEBUG
938                                 fprintf(stderr,
939                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
940                                         rrd.rra_ptr[i].cur_row,
941                                         elapsed_pdp_st,
942                                         rrd.rra_def[i].
943                                         par[RRA_seasonal_smooth_idx].u_cnt);
944 #endif
945                                 schedule_smooth = 1;
946                             }
947                         }
948                     }
949
950                     rra_current = rrd_tell(rrd_file);
951                 }
952                 /* if cf is DEVSEASONAL or SEASONAL */
953                 if (rrd_test_error())
954                     break;
955
956                 /* update CDP_PREP areas */
957                 /* loop over data soures within each RRA */
958                 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
959
960                     /* iii indexes the CDP prep area for this data source within the RRA */
961                     iii = i * rrd.stat_head->ds_cnt + ii;
962
963                     if (rrd.rra_def[i].pdp_cnt > 1) {
964
965                         if (rra_step_cnt[i] > 0) {
966                             /* If we are in this block, as least 1 CDP value will be written to
967                              * disk, this is the CDP_primary_val entry. If more than 1 value needs
968                              * to be written, then the "fill in" value is the CDP_secondary_val
969                              * entry. */
970                             if (isnan(pdp_temp[ii])) {
971                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
972                                     u_cnt += start_pdp_offset;
973                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
974                                     u_val = DNAN;
975                             } else {
976                                 /* CDP_secondary value is the RRA "fill in" value for intermediary
977                                  * CDP data entries. No matter the CF, the value is the same because
978                                  * the average, max, min, and last of a list of identical values is
979                                  * the same, namely, the value itself. */
980                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
981                                     u_val = pdp_temp[ii];
982                             }
983
984                             if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
985                                 u_cnt >
986                                 rrd.rra_def[i].pdp_cnt *
987                                 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
988                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
989                                     u_val = DNAN;
990                                 /* initialize carry over */
991                                 if (current_cf == CF_AVERAGE) {
992                                     if (isnan(pdp_temp[ii])) {
993                                         rrd.cdp_prep[iii].scratch[CDP_val].
994                                             u_val = DNAN;
995                                     } else {
996                                         rrd.cdp_prep[iii].scratch[CDP_val].
997                                             u_val =
998                                             pdp_temp[ii] *
999                                             ((elapsed_pdp_st -
1000                                               start_pdp_offset) %
1001                                              rrd.rra_def[i].pdp_cnt);
1002                                     }
1003                                 } else {
1004                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1005                                         pdp_temp[ii];
1006                                 }
1007                             } else {
1008                                 rrd_value_t cum_val, cur_val;
1009
1010                                 switch (current_cf) {
1011                                 case CF_AVERAGE:
1012                                     cum_val =
1013                                         IFDNAN(rrd.cdp_prep[iii].
1014                                                scratch[CDP_val].u_val, 0.0);
1015                                     cur_val = IFDNAN(pdp_temp[ii], 0.0);
1016                                     rrd.cdp_prep[iii].
1017                                         scratch[CDP_primary_val].u_val =
1018                                         (cum_val +
1019                                          cur_val * start_pdp_offset) /
1020                                         (rrd.rra_def[i].pdp_cnt -
1021                                          rrd.cdp_prep[iii].
1022                                          scratch[CDP_unkn_pdp_cnt].u_cnt);
1023                                     /* initialize carry over value */
1024                                     if (isnan(pdp_temp[ii])) {
1025                                         rrd.cdp_prep[iii].scratch[CDP_val].
1026                                             u_val = DNAN;
1027                                     } else {
1028                                         rrd.cdp_prep[iii].scratch[CDP_val].
1029                                             u_val =
1030                                             pdp_temp[ii] *
1031                                             ((elapsed_pdp_st -
1032                                               start_pdp_offset) %
1033                                              rrd.rra_def[i].pdp_cnt);
1034                                     }
1035                                     break;
1036                                 case CF_MAXIMUM:
1037                                     cum_val =
1038                                         IFDNAN(rrd.cdp_prep[iii].
1039                                                scratch[CDP_val].u_val, -DINF);
1040                                     cur_val = IFDNAN(pdp_temp[ii], -DINF);
1041 #ifdef DEBUG
1042                                     if (isnan
1043                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1044                                          u_val) && isnan(pdp_temp[ii])) {
1045                                         fprintf(stderr,
1046                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1047                                                 i, ii);
1048                                         exit(-1);
1049                                     }
1050 #endif
1051                                     if (cur_val > cum_val)
1052                                         rrd.cdp_prep[iii].
1053                                             scratch[CDP_primary_val].u_val =
1054                                             cur_val;
1055                                     else
1056                                         rrd.cdp_prep[iii].
1057                                             scratch[CDP_primary_val].u_val =
1058                                             cum_val;
1059                                     /* initialize carry over value */
1060                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1061                                         pdp_temp[ii];
1062                                     break;
1063                                 case CF_MINIMUM:
1064                                     cum_val =
1065                                         IFDNAN(rrd.cdp_prep[iii].
1066                                                scratch[CDP_val].u_val, DINF);
1067                                     cur_val = IFDNAN(pdp_temp[ii], DINF);
1068 #ifdef DEBUG
1069                                     if (isnan
1070                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1071                                          u_val) && isnan(pdp_temp[ii])) {
1072                                         fprintf(stderr,
1073                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1074                                                 i, ii);
1075                                         exit(-1);
1076                                     }
1077 #endif
1078                                     if (cur_val < cum_val)
1079                                         rrd.cdp_prep[iii].
1080                                             scratch[CDP_primary_val].u_val =
1081                                             cur_val;
1082                                     else
1083                                         rrd.cdp_prep[iii].
1084                                             scratch[CDP_primary_val].u_val =
1085                                             cum_val;
1086                                     /* initialize carry over value */
1087                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1088                                         pdp_temp[ii];
1089                                     break;
1090                                 case CF_LAST:
1091                                 default:
1092                                     rrd.cdp_prep[iii].
1093                                         scratch[CDP_primary_val].u_val =
1094                                         pdp_temp[ii];
1095                                     /* initialize carry over value */
1096                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1097                                         pdp_temp[ii];
1098                                     break;
1099                                 }
1100                             }   /* endif meets xff value requirement for a valid value */
1101                             /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1102                              * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1103                             if (isnan(pdp_temp[ii]))
1104                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1105                                     u_cnt =
1106                                     (elapsed_pdp_st -
1107                                      start_pdp_offset) %
1108                                     rrd.rra_def[i].pdp_cnt;
1109                             else
1110                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1111                                     u_cnt = 0;
1112                         } else {    /* rra_step_cnt[i]  == 0 */
1113
1114 #ifdef DEBUG
1115                             if (isnan
1116                                 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1117                                 fprintf(stderr,
1118                                         "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1119                                         i, ii);
1120                             } else {
1121                                 fprintf(stderr,
1122                                         "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1123                                         i, ii,
1124                                         rrd.cdp_prep[iii].scratch[CDP_val].
1125                                         u_val);
1126                             }
1127 #endif
1128                             if (isnan(pdp_temp[ii])) {
1129                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1130                                     u_cnt += elapsed_pdp_st;
1131                             } else
1132                                 if (isnan
1133                                     (rrd.cdp_prep[iii].scratch[CDP_val].
1134                                      u_val)) {
1135                                 if (current_cf == CF_AVERAGE) {
1136                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1137                                         pdp_temp[ii] * elapsed_pdp_st;
1138                                 } else {
1139                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1140                                         pdp_temp[ii];
1141                                 }
1142 #ifdef DEBUG
1143                                 fprintf(stderr,
1144                                         "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1145                                         i, ii,
1146                                         rrd.cdp_prep[iii].scratch[CDP_val].
1147                                         u_val);
1148 #endif
1149                             } else {
1150                                 switch (current_cf) {
1151                                 case CF_AVERAGE:
1152                                     rrd.cdp_prep[iii].scratch[CDP_val].
1153                                         u_val +=
1154                                         pdp_temp[ii] * elapsed_pdp_st;
1155                                     break;
1156                                 case CF_MINIMUM:
1157                                     if (pdp_temp[ii] <
1158                                         rrd.cdp_prep[iii].scratch[CDP_val].
1159                                         u_val)
1160                                         rrd.cdp_prep[iii].scratch[CDP_val].
1161                                             u_val = pdp_temp[ii];
1162                                     break;
1163                                 case CF_MAXIMUM:
1164                                     if (pdp_temp[ii] >
1165                                         rrd.cdp_prep[iii].scratch[CDP_val].
1166                                         u_val)
1167                                         rrd.cdp_prep[iii].scratch[CDP_val].
1168                                             u_val = pdp_temp[ii];
1169                                     break;
1170                                 case CF_LAST:
1171                                 default:
1172                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1173                                         pdp_temp[ii];
1174                                     break;
1175                                 }
1176                             }
1177                         }
1178                     } else {    /* rrd.rra_def[i].pdp_cnt == 1 */
1179                         if (elapsed_pdp_st > 2) {
1180                             switch (current_cf) {
1181                             case CF_AVERAGE:
1182                             default:
1183                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1184                                     u_val = pdp_temp[ii];
1185                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1186                                     u_val = pdp_temp[ii];
1187                                 break;
1188                             case CF_SEASONAL:
1189                             case CF_DEVSEASONAL:
1190                                 /* need to update cached seasonal values, so they are consistent
1191                                  * with the bulk update */
1192                                 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1193                                  * CDP_last_deviation are the same. */
1194                                 rrd.cdp_prep[iii].
1195                                     scratch[CDP_hw_last_seasonal].u_val =
1196                                     last_seasonal_coef[ii];
1197                                 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1198                                     u_val = seasonal_coef[ii];
1199                                 break;
1200                             case CF_HWPREDICT:
1201                             case CF_MHWPREDICT:
1202                                 /* need to update the null_count and last_null_count.
1203                                  * even do this for non-DNAN pdp_temp because the
1204                                  * algorithm is not learning from batch updates. */
1205                                 rrd.cdp_prep[iii].scratch[CDP_null_count].
1206                                     u_cnt += elapsed_pdp_st;
1207                                 rrd.cdp_prep[iii].
1208                                     scratch[CDP_last_null_count].u_cnt +=
1209                                     elapsed_pdp_st - 1;
1210                                 /* fall through */
1211                             case CF_DEVPREDICT:
1212                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1213                                     u_val = DNAN;
1214                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1215                                     u_val = DNAN;
1216                                 break;
1217                             case CF_FAILURES:
1218                                 /* do not count missed bulk values as failures */
1219                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1220                                     u_val = 0;
1221                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1222                                     u_val = 0;
1223                                 /* need to reset violations buffer.
1224                                  * could do this more carefully, but for now, just
1225                                  * assume a bulk update wipes away all violations. */
1226                                 erase_violations(&rrd, iii, i);
1227                                 break;
1228                             }
1229                         }
1230                     }   /* endif rrd.rra_def[i].pdp_cnt == 1 */
1231
1232                     if (rrd_test_error())
1233                         break;
1234
1235                 }       /* endif data sources loop */
1236             }           /* end RRA Loop */
1237
1238             /* this loop is only entered if elapsed_pdp_st < 3 */
1239             for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1240                  j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1241                 for (i = 0, rra_start = rra_begin;
1242                      i < rrd.stat_head->rra_cnt;
1243                      rra_start +=
1244                      rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1245                      sizeof(rrd_value_t), i++) {
1246                     if (rrd.rra_def[i].pdp_cnt > 1)
1247                         continue;
1248
1249                     current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1250                     if (current_cf == CF_SEASONAL
1251                         || current_cf == CF_DEVSEASONAL) {
1252                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
1253                                         elapsed_pdp_st + (scratch_idx ==
1254                                                           CDP_primary_val ? 1
1255                                                           : 2),
1256                                         &seasonal_coef);
1257                         rra_current = rrd_tell(rrd_file);
1258                     }
1259                     if (rrd_test_error())
1260                         break;
1261                     /* loop over data soures within each RRA */
1262                     for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1263                         update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1264                                            i * (rrd.stat_head->ds_cnt) + ii,
1265                                            i, ii, scratch_idx, seasonal_coef);
1266                     }
1267                 }       /* end RRA Loop */
1268                 if (rrd_test_error())
1269                     break;
1270             }           /* end elapsed_pdp_st loop */
1271
1272             if (rrd_test_error())
1273                 break;
1274
1275             /* Ready to write to disk */
1276             /* Move sequentially through the file, writing one RRA at a time.
1277              * Note this architecture divorces the computation of CDP with
1278              * flushing updated RRA entries to disk. */
1279             for (i = 0, rra_start = rra_begin;
1280                  i < rrd.stat_head->rra_cnt;
1281                  rra_start +=
1282                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1283                  sizeof(rrd_value_t), i++) {
1284                 /* is th5Aere anything to write for this RRA? If not, continue. */
1285                 if (rra_step_cnt[i] == 0)
1286                     continue;
1287
1288                 /* write the first row */
1289 #ifdef DEBUG
1290                 fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1291 #endif
1292                 rrd.rra_ptr[i].cur_row++;
1293                 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1294                     rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1295                 /* positition on the first row */
1296                 rra_pos_tmp = rra_start +
1297                     (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1298                     sizeof(rrd_value_t);
1299                 if (rra_pos_tmp != rra_current) {
1300                     if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1301                         rrd_set_error("seek error in rrd");
1302                         break;
1303                     }
1304                     rra_current = rra_pos_tmp;
1305                 }
1306 #ifdef DEBUG
1307                 fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1308 #endif
1309                 scratch_idx = CDP_primary_val;
1310                 if (pcdp_summary != NULL) {
1311                     rra_time = (current_time - current_time
1312                                 % (rrd.rra_def[i].pdp_cnt *
1313                                    rrd.stat_head->pdp_step))
1314                         -
1315                         ((rra_step_cnt[i] -
1316                           1) * rrd.rra_def[i].pdp_cnt *
1317                          rrd.stat_head->pdp_step);
1318                 }
1319                 pcdp_summary =
1320                     write_RRA_row(rrd_file, &rrd, i, &rra_current,
1321                                   scratch_idx, pcdp_summary, &rra_time);
1322                 if (rrd_test_error())
1323                     break;
1324
1325                 /* write other rows of the bulk update, if any */
1326                 scratch_idx = CDP_secondary_val;
1327                 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1328                     if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1329 #ifdef DEBUG
1330                         fprintf(stderr,
1331                                 "Wraparound for RRA %s, %lu updates left\n",
1332                                 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1333 #endif
1334                         /* wrap */
1335                         rrd.rra_ptr[i].cur_row = 0;
1336                         /* seek back to beginning of current rra */
1337                         if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1338                             rrd_set_error("seek error in rrd");
1339                             break;
1340                         }
1341 #ifdef DEBUG
1342                         fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1343                                 rrd_file->pos);
1344 #endif
1345                         rra_current = rra_start;
1346                     }
1347                     if (pcdp_summary != NULL) {
1348                         rra_time = (current_time - current_time
1349                                     % (rrd.rra_def[i].pdp_cnt *
1350                                        rrd.stat_head->pdp_step))
1351                             -
1352                             ((rra_step_cnt[i] -
1353                               2) * rrd.rra_def[i].pdp_cnt *
1354                              rrd.stat_head->pdp_step);
1355                     }
1356                     pcdp_summary =
1357                         write_RRA_row(rrd_file, &rrd, i, &rra_current,
1358                                       scratch_idx, pcdp_summary, &rra_time);
1359                 }
1360
1361                 if (rrd_test_error())
1362                     break;
1363             }           /* RRA LOOP */
1364
1365             /* break out of the argument parsing loop if error_string is set */
1366             if (rrd_test_error()) {
1367                 free(step_start);
1368                 break;
1369             }
1370
1371         }               /* endif a pdp_st has occurred */
1372         rrd.live_head->last_up = current_time;
1373         rrd.live_head->last_up_usec = current_time_usec;
1374         free(step_start);
1375     }                   /* function argument loop */
1376
1377     if (seasonal_coef != NULL)
1378         free(seasonal_coef);
1379     if (last_seasonal_coef != NULL)
1380         free(last_seasonal_coef);
1381     if (rra_step_cnt != NULL)
1382         free(rra_step_cnt);
1383     rpnstack_free(&rpnstack);
1384
1385 #if 0
1386     //rrd_flush(rrd_file);    //XXX: really needed?
1387 #endif
1388     /* if we got here and if there is an error and if the file has not been
1389      * written to, then close things up and return. */
1390     if (rrd_test_error()) {
1391         goto err_free_pdp_new;
1392     }
1393
1394     /* aargh ... that was tough ... so many loops ... anyway, its done.
1395      * we just need to write back the live header portion now*/
1396
1397     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1398                             + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1399                             + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1400                  SEEK_SET) != 0) {
1401         rrd_set_error("seek rrd for live header writeback");
1402         goto err_free_pdp_new;
1403     }
1404     /* for mmap, we did already write to the underlying mapping, so we do
1405        not need to write again.  */
1406 #ifndef HAVE_MMAP
1407     if (version >= 3) {
1408         if (rrd_write(rrd_file, rrd.live_head,
1409                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1410             rrd_set_error("rrd_write live_head to rrd");
1411             goto err_free_pdp_new;
1412         }
1413     } else {
1414         if (rrd_write(rrd_file, &rrd.live_head->last_up,
1415                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1416             rrd_set_error("rrd_write live_head to rrd");
1417             goto err_free_pdp_new;
1418         }
1419     }
1420
1421
1422     if (rrd_write(rrd_file, rrd.pdp_prep,
1423                   sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1424         != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1425         rrd_set_error("rrd_write pdp_prep to rrd");
1426         goto err_free_pdp_new;
1427     }
1428
1429     if (rrd_write(rrd_file, rrd.cdp_prep,
1430                   sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1431                   rrd.stat_head->ds_cnt)
1432         != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1433                       rrd.stat_head->ds_cnt)) {
1434
1435         rrd_set_error("rrd_write cdp_prep to rrd");
1436         goto err_free_pdp_new;
1437     }
1438
1439     if (rrd_write(rrd_file, rrd.rra_ptr,
1440                   sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1441         != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1442         rrd_set_error("rrd_write rra_ptr to rrd");
1443         goto err_free_pdp_new;
1444     }
1445 #endif
1446 #ifdef HAVE_POSIX_FADVISExxx
1447
1448     /* with update we have write ops, so they will probably not be done by now, this means
1449        the buffers will not get freed. But calling this for the whole file - header
1450        will let the data off the hook as soon as it is written when if it is from a previous
1451        update cycle. Calling fdsync to force things is much too hard here. */
1452
1453     if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1454         rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1455                       rrd_strerror(errno));
1456         goto err_free_pdp_new;
1457     }
1458 #endif
1459     /* rrd_flush(rrd_file); */
1460
1461     /* calling the smoothing code here guarantees at most
1462      * one smoothing operation per rrd_update call. Unfortunately,
1463      * it is possible with bulk updates, or a long-delayed update
1464      * for smoothing to occur off-schedule. This really isn't
1465      * critical except during the burning cycles. */
1466     if (schedule_smooth) {
1467
1468         rra_start = rra_begin;
1469         for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1470             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1471                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1472 #ifdef DEBUG
1473                 fprintf(stderr, "Running smoother for rra %ld\n", i);
1474 #endif
1475                 apply_smoother(&rrd, i, rra_start, rrd_file);
1476                 if (rrd_test_error())
1477                     break;
1478             }
1479             rra_start += rrd.rra_def[i].row_cnt
1480                 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1481         }
1482 #ifdef HAVE_POSIX_FADVISExxx
1483         /* same procedure as above ... */
1484         if (0 !=
1485             posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1486             rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1487                           rrd_strerror(errno));
1488             goto err_free_pdp_new;
1489         }
1490 #endif
1491     }
1492
1493     rrd_free(&rrd);
1494     rrd_close(rrd_file);
1495
1496     free(pdp_new);
1497     free(tmpl_idx);
1498     free(pdp_temp);
1499     free(updvals);
1500     return (0);
1501
1502   err_free_pdp_new:
1503     free(pdp_new);
1504   err_free_tmpl_idx:
1505     free(tmpl_idx);
1506   err_free_pdp_temp:
1507     free(pdp_temp);
1508   err_free_updvals:
1509     free(updvals);
1510   err_close:
1511     rrd_close(rrd_file);
1512   err_free:
1513     rrd_free(&rrd);
1514   err_out:
1515     return (-1);
1516 }
1517
1518 /*
1519  * get exclusive lock to whole file.
1520  * lock gets removed when we close the file
1521  *
1522  * returns 0 on success
1523  */
1524 int LockRRD(
1525     int in_file)
1526 {
1527     int       rcstat;
1528
1529     {
1530 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1531         struct _stat st;
1532
1533         if (_fstat(in_file, &st) == 0) {
1534             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1535         } else {
1536             rcstat = -1;
1537         }
1538 #else
1539         struct flock lock;
1540
1541         lock.l_type = F_WRLCK;  /* exclusive write lock */
1542         lock.l_len = 0; /* whole file */
1543         lock.l_start = 0;   /* start of file */
1544         lock.l_whence = SEEK_SET;   /* end of file */
1545
1546         rcstat = fcntl(in_file, F_SETLK, &lock);
1547 #endif
1548     }
1549
1550     return (rcstat);
1551 }