Bernhard Fischer:
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
16
17 #include "rrd_hw.h"
18 #include "rrd_rpncalc.h"
19
20 #include "rrd_is_thread_safe.h"
21 #include "unused.h"
22
23 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
24 /*
25  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
26  * replacement.
27  */
28 #include <sys/timeb.h>
29
30 #ifndef __MINGW32__
31 struct timeval {
32     time_t    tv_sec;   /* seconds */
33     long      tv_usec;  /* microseconds */
34 };
35 #endif
36
37 struct __timezone {
38     int       tz_minuteswest;   /* minutes W of Greenwich */
39     int       tz_dsttime;   /* type of dst correction */
40 };
41
42 static int gettimeofday(
43     struct timeval *t,
44     struct __timezone *tz)
45 {
46
47     struct _timeb current_time;
48
49     _ftime(&current_time);
50
51     t->tv_sec = current_time.time;
52     t->tv_usec = current_time.millitm * 1000;
53
54     return 0;
55 }
56
57 #endif
58 /*
59  * normalize time as returned by gettimeofday. usec part must
60  * be always >= 0
61  */
62 static inline void normalize_time(
63     struct timeval *t)
64 {
65     if (t->tv_usec < 0) {
66         t->tv_sec--;
67         t->tv_usec += 1000000L;
68     }
69 }
70
71 static inline info_t *write_RRA_row(
72     rrd_file_t *rrd_file,
73     rrd_t *rrd,
74     unsigned long rra_idx,
75     unsigned long *rra_current,
76     unsigned short CDP_scratch_idx,
77     info_t *pcdp_summary,
78     time_t *rra_time)
79 {
80     unsigned long ds_idx, cdp_idx;
81     infoval   iv;
82
83     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
84         /* compute the cdp index */
85         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
86 #ifdef DEBUG
87         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
88                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
89                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
90 #endif
91         if (pcdp_summary != NULL) {
92             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
93             /* append info to the return hash */
94             pcdp_summary = info_push(pcdp_summary,
95                                      sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
96                                                    *rra_time,
97                                                    rrd->rra_def[rra_idx].
98                                                    cf_nam,
99                                                    rrd->rra_def[rra_idx].
100                                                    pdp_cnt,
101                                                    rrd->ds_def[ds_idx].
102                                                    ds_nam), RD_I_VAL, iv);
103         }
104         if (rrd_write
105             (rrd_file,
106              &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
107              sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
108             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
109             return 0;
110         }
111         *rra_current += sizeof(rrd_value_t);
112     }
113     return (pcdp_summary);
114 }
115
116 int       rrd_update_r(
117     const char *filename,
118     const char *tmplt,
119     int argc,
120     const char **argv);
121 int       _rrd_update(
122     const char *filename,
123     const char *tmplt,
124     int argc,
125     const char **argv,
126     info_t *);
127
128 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
129
130
131 info_t   *rrd_update_v(
132     int argc,
133     char **argv)
134 {
135     char     *tmplt = NULL;
136     info_t   *result = NULL;
137     infoval   rc;
138
139     rc.u_int = -1;
140     optind = 0;
141     opterr = 0;         /* initialize getopt */
142
143     while (1) {
144         static struct option long_options[] = {
145             {"template", required_argument, 0, 't'},
146             {0, 0, 0, 0}
147         };
148         int       option_index = 0;
149         int       opt;
150
151         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
152
153         if (opt == EOF)
154             break;
155
156         switch (opt) {
157         case 't':
158             tmplt = optarg;
159             break;
160
161         case '?':
162             rrd_set_error("unknown option '%s'", argv[optind - 1]);
163             goto end_tag;
164         }
165     }
166
167     /* need at least 2 arguments: filename, data. */
168     if (argc - optind < 2) {
169         rrd_set_error("Not enough arguments");
170         goto end_tag;
171     }
172     rc.u_int = 0;
173     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
174     rc.u_int = _rrd_update(argv[optind], tmplt,
175                            argc - optind - 1,
176                            (const char **) (argv + optind + 1), result);
177     result->value.u_int = rc.u_int;
178   end_tag:
179     return result;
180 }
181
182 int rrd_update(
183     int argc,
184     char **argv)
185 {
186     char     *tmplt = NULL;
187     int       rc;
188
189     optind = 0;
190     opterr = 0;         /* initialize getopt */
191
192     while (1) {
193         static struct option long_options[] = {
194             {"template", required_argument, 0, 't'},
195             {0, 0, 0, 0}
196         };
197         int       option_index = 0;
198         int       opt;
199
200         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
201
202         if (opt == EOF)
203             break;
204
205         switch (opt) {
206         case 't':
207             tmplt = optarg;
208             break;
209
210         case '?':
211             rrd_set_error("unknown option '%s'", argv[optind - 1]);
212             return (-1);
213         }
214     }
215
216     /* need at least 2 arguments: filename, data. */
217     if (argc - optind < 2) {
218         rrd_set_error("Not enough arguments");
219
220         return -1;
221     }
222
223     rc = rrd_update_r(argv[optind], tmplt,
224                       argc - optind - 1, (const char **) (argv + optind + 1));
225     return rc;
226 }
227
228 int rrd_update_r(
229     const char *filename,
230     const char *tmplt,
231     int argc,
232     const char **argv)
233 {
234     return _rrd_update(filename, tmplt, argc, argv, NULL);
235 }
236
237 int _rrd_update(
238     const char *filename,
239     const char *tmplt,
240     int argc,
241     const char **argv,
242     info_t *pcdp_summary)
243 {
244
245     int       arg_i = 2;
246     short     j;
247     unsigned long i, ii, iii = 1;
248
249     unsigned long rra_begin;    /* byte pointer to the rra
250                                  * area in the rrd file.  this
251                                  * pointer never changes value */
252     unsigned long rra_start;    /* byte pointer to the rra
253                                  * area in the rrd file.  this
254                                  * pointer changes as each rrd is
255                                  * processed. */
256     unsigned long rra_current;  /* byte pointer to the current write
257                                  * spot in the rrd file. */
258     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
259     double    interval, pre_int, post_int;  /* interval between this and
260                                              * the last run */
261     unsigned long proc_pdp_st;  /* which pdp_st was the last
262                                  * to be processed */
263     unsigned long occu_pdp_st;  /* when was the pdp_st
264                                  * before the last update
265                                  * time */
266     unsigned long proc_pdp_age; /* how old was the data in
267                                  * the pdp prep area when it
268                                  * was last updated */
269     unsigned long occu_pdp_age; /* how long ago was the last
270                                  * pdp_step time */
271     rrd_value_t *pdp_new;   /* prepare the incoming data
272                              * to be added the the
273                              * existing entry */
274     rrd_value_t *pdp_temp;  /* prepare the pdp values
275                              * to be added the the
276                              * cdp values */
277
278     long     *tmpl_idx; /* index representing the settings
279                            transported by the tmplt index */
280     unsigned long tmpl_cnt = 2; /* time and data */
281
282     rrd_t     rrd;
283     time_t    current_time = 0;
284     time_t    rra_time = 0; /* time of update for a RRA */
285     unsigned long current_time_usec = 0;    /* microseconds part of current time */
286     struct timeval tmp_time;    /* used for time conversion */
287
288     char    **updvals;
289     int       schedule_smooth = 0;
290     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
291
292     /* a vector of future Holt-Winters seasonal coefs */
293     unsigned long elapsed_pdp_st;
294
295     /* number of elapsed PDP steps since last update */
296     unsigned long *rra_step_cnt = NULL;
297
298     /* number of rows to be updated in an RRA for a data
299      * value. */
300     unsigned long start_pdp_offset;
301
302     /* number of PDP steps since the last update that
303      * are assigned to the first CDP to be generated
304      * since the last update. */
305     unsigned short scratch_idx;
306
307     /* index into the CDP scratch array */
308     enum cf_en current_cf;
309
310     /* numeric id of the current consolidation function */
311     rpnstack_t rpnstack;    /* used for COMPUTE DS */
312     int       version;  /* rrd version */
313     char     *endptr;   /* used in the conversion */
314     rrd_file_t *rrd_file;
315
316     rpnstack_init(&rpnstack);
317
318     /* need at least 1 arguments: data. */
319     if (argc < 1) {
320         rrd_set_error("Not enough arguments");
321         goto err_out;
322     }
323
324     rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
325     if (rrd_file == NULL) {
326         goto err_free;
327     }
328     /* We are now at the beginning of the rra's */
329     rra_current = rra_start = rra_begin = rrd_file->header_len;
330
331     /* initialize time */
332     version = atoi(rrd.stat_head->version);
333     gettimeofday(&tmp_time, 0);
334     normalize_time(&tmp_time);
335     current_time = tmp_time.tv_sec;
336     if (version >= 3) {
337         current_time_usec = tmp_time.tv_usec;
338     } else {
339         current_time_usec = 0;
340     }
341
342     /* get exclusive lock to whole file.
343      * lock gets removed when we close the file.
344      */
345     if (LockRRD(rrd_file->fd) != 0) {
346         rrd_set_error("could not lock RRD");
347         goto err_close;
348     }
349
350     if ((updvals =
351          malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
352         rrd_set_error("allocating updvals pointer array");
353         goto err_close;
354     }
355
356     if ((pdp_temp = malloc(sizeof(rrd_value_t)
357                            * rrd.stat_head->ds_cnt)) == NULL) {
358         rrd_set_error("allocating pdp_temp ...");
359         goto err_free_updvals;
360     }
361
362     if ((tmpl_idx = malloc(sizeof(unsigned long)
363                            * (rrd.stat_head->ds_cnt + 1))) == NULL) {
364         rrd_set_error("allocating tmpl_idx ...");
365         goto err_free_pdp_temp;
366     }
367     /* initialize tmplt redirector */
368     /* default config example (assume DS 1 is a CDEF DS)
369        tmpl_idx[0] -> 0; (time)
370        tmpl_idx[1] -> 1; (DS 0)
371        tmpl_idx[2] -> 3; (DS 2)
372        tmpl_idx[3] -> 4; (DS 3) */
373     tmpl_idx[0] = 0;    /* time */
374     for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
375         if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
376             tmpl_idx[ii++] = i;
377     }
378     tmpl_cnt = ii;
379
380     if (tmplt) {
381         /* we should work on a writeable copy here */
382         char     *dsname;
383         unsigned int tmpl_len;
384         char     *tmplt_copy = strdup(tmplt);
385
386         dsname = tmplt_copy;
387         tmpl_cnt = 1;   /* the first entry is the time */
388         tmpl_len = strlen(tmplt_copy);
389         for (i = 0; i <= tmpl_len; i++) {
390             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
391                 tmplt_copy[i] = '\0';
392                 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
393                     rrd_set_error
394                         ("tmplt contains more DS definitions than RRD");
395                     goto err_free_tmpl_idx;
396                 }
397                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
398                     rrd_set_error("unknown DS name '%s'", dsname);
399                     goto err_free_tmpl_idx;
400                 } else {
401                     /* the first element is always the time */
402                     tmpl_idx[tmpl_cnt - 1]++;
403                     /* go to the next entry on the tmplt_copy */
404                     dsname = &tmplt_copy[i + 1];
405                     /* fix the damage we did before */
406                     if (i < tmpl_len) {
407                         tmplt_copy[i] = ':';
408                     }
409
410                 }
411             }
412         }
413         free(tmplt_copy);
414     }
415     if ((pdp_new = malloc(sizeof(rrd_value_t)
416                           * rrd.stat_head->ds_cnt)) == NULL) {
417         rrd_set_error("allocating pdp_new ...");
418         goto err_free_tmpl_idx;
419     }
420     /* loop through the arguments. */
421     for (arg_i = 0; arg_i < argc; arg_i++) {
422         char     *stepper = strdup(argv[arg_i]);
423         char     *step_start = stepper;
424         char     *p;
425         char     *parsetime_error = NULL;
426         enum { atstyle, normal } timesyntax;
427         struct rrd_time_value ds_tv;
428
429         if (stepper == NULL) {
430             rrd_set_error("failed duplication argv entry");
431             free(step_start);
432             goto err_free_pdp_new;
433         }
434         /* initialize all ds input to unknown except the first one
435            which has always got to be set */
436         memset(updvals + 1, 'U', rrd.stat_head->ds_cnt);
437         updvals[0] = stepper;
438         /* separate all ds elements; first must be examined separately
439            due to alternate time syntax */
440         if ((p = strchr(stepper, '@')) != NULL) {
441             timesyntax = atstyle;
442             *p = '\0';
443             stepper = p + 1;
444         } else if ((p = strchr(stepper, ':')) != NULL) {
445             timesyntax = normal;
446             *p = '\0';
447             stepper = p + 1;
448         } else {
449             rrd_set_error
450                 ("expected timestamp not found in data source from %s",
451                  argv[arg_i]);
452             free(step_start);
453             break;
454         }
455         ii = 1;
456         updvals[tmpl_idx[ii]] = stepper;
457         while (*stepper) {
458             if (*stepper == ':') {
459                 *stepper = '\0';
460                 ii++;
461                 if (ii < tmpl_cnt) {
462                     updvals[tmpl_idx[ii]] = stepper + 1;
463                 }
464             }
465             stepper++;
466         }
467
468         if (ii != tmpl_cnt - 1) {
469             rrd_set_error
470                 ("expected %lu data source readings (got %lu) from %s",
471                  tmpl_cnt - 1, ii, argv[arg_i]);
472             free(step_start);
473             break;
474         }
475
476         /* get the time from the reading ... handle N */
477         if (timesyntax == atstyle) {
478             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
479                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
480                 free(step_start);
481                 break;
482             }
483             if (ds_tv.type == RELATIVE_TO_END_TIME ||
484                 ds_tv.type == RELATIVE_TO_START_TIME) {
485                 rrd_set_error("specifying time relative to the 'start' "
486                               "or 'end' makes no sense here: %s", updvals[0]);
487                 free(step_start);
488                 break;
489             }
490
491             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
492
493             current_time_usec = 0;  /* FIXME: how to handle usecs here ? */
494
495         } else if (strcmp(updvals[0], "N") == 0) {
496             gettimeofday(&tmp_time, 0);
497             normalize_time(&tmp_time);
498             current_time = tmp_time.tv_sec;
499             current_time_usec = tmp_time.tv_usec;
500         } else {
501             double    tmp;
502
503             tmp = strtod(updvals[0], 0);
504             current_time = floor(tmp);
505             current_time_usec =
506                 (long) ((tmp - (double) current_time) * 1000000.0);
507         }
508         /* dont do any correction for old version RRDs */
509         if (version < 3)
510             current_time_usec = 0;
511
512         if (current_time < rrd.live_head->last_up ||
513             (current_time == rrd.live_head->last_up &&
514              (long) current_time_usec <=
515              (long) rrd.live_head->last_up_usec)) {
516             rrd_set_error("illegal attempt to update using time %ld when "
517                           "last update time is %ld (minimum one second step)",
518                           current_time, rrd.live_head->last_up);
519             free(step_start);
520             break;
521         }
522
523         /* seek to the beginning of the rra's */
524         if (rra_current != rra_begin) {
525 #ifndef HAVE_MMAP
526             if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
527                 rrd_set_error("seek error in rrd");
528                 free(step_start);
529                 break;
530             }
531 #endif
532             rra_current = rra_begin;
533         }
534         rra_start = rra_begin;
535
536         /* when was the current pdp started */
537         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
538         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
539
540         /* when did the last pdp_st occur */
541         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
542         occu_pdp_st = current_time - occu_pdp_age;
543
544         /* interval = current_time - rrd.live_head->last_up; */
545         interval = (double) (current_time - rrd.live_head->last_up)
546             + (double) ((long) current_time_usec -
547                         (long) rrd.live_head->last_up_usec) / 1000000.0;
548
549         if (occu_pdp_st > proc_pdp_st) {
550             /* OK we passed the pdp_st moment */
551             pre_int = (long) occu_pdp_st - rrd.live_head->last_up;  /* how much of the input data
552                                                                      * occurred before the latest
553                                                                      * pdp_st moment*/
554             pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0;  /* adjust usecs */
555             post_int = occu_pdp_age;    /* how much after it */
556             post_int += ((double) current_time_usec) / 1000000.0;   /* adjust usecs */
557         } else {
558             pre_int = interval;
559             post_int = 0;
560         }
561
562 #ifdef DEBUG
563         printf("proc_pdp_age %lu\t"
564                "proc_pdp_st %lu\t"
565                "occu_pfp_age %lu\t"
566                "occu_pdp_st %lu\t"
567                "int %lf\t"
568                "pre_int %lf\t"
569                "post_int %lf\n", proc_pdp_age, proc_pdp_st,
570                occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
571 #endif
572
573         /* process the data sources and update the pdp_prep 
574          * area accordingly */
575         for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
576             enum dst_en dst_idx;
577
578             dst_idx = dst_conv(rrd.ds_def[i].dst);
579
580             /* make sure we do not build diffs with old last_ds values */
581             if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
582                 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
583                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
584             }
585
586             /* NOTE: DST_CDEF should never enter this if block, because
587              * updvals[i+1][0] is initialized to 'U'; unless the caller
588              * accidently specified a value for the DST_CDEF. To handle
589              * this case, an extra check is required. */
590
591             if ((updvals[i + 1][0] != 'U') &&
592                 (dst_idx != DST_CDEF) &&
593                 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
594                 double    rate = DNAN;
595
596                 /* the data source type defines how to process the data */
597                 /* pdp_new contains rate * time ... eg the bytes
598                  * transferred during the interval. Doing it this way saves
599                  * a lot of math operations */
600                 switch (dst_idx) {
601                 case DST_COUNTER:
602                 case DST_DERIVE:
603                     if (rrd.pdp_prep[i].last_ds[0] != 'U') {
604                         for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
605                             if ((updvals[i + 1][ii] < '0'
606                                  || updvals[i + 1][ii] > '9') && (ii != 0
607                                                                   && updvals[i
608                                                                              +
609                                                                              1]
610                                                                   [ii] !=
611                                                                   '-')) {
612                                 rrd_set_error("not a simple integer: '%s'",
613                                               updvals[i + 1]);
614                                 break;
615                             }
616                         }
617                         if (rrd_test_error()) {
618                             break;
619                         }
620                         pdp_new[i] =
621                             rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
622                         if (dst_idx == DST_COUNTER) {
623                             /* simple overflow catcher suggested by Andres Kroonmaa */
624                             /* this will fail terribly for non 32 or 64 bit counters ... */
625                             /* are there any others in SNMP land ? */
626                             if (pdp_new[i] < (double) 0.0)
627                                 pdp_new[i] += (double) 4294967296.0;    /* 2^32 */
628                             if (pdp_new[i] < (double) 0.0)
629                                 pdp_new[i] += (double) 18446744069414584320.0;
630                             /* 2^64-2^32 */ ;
631                         }
632                         rate = pdp_new[i] / interval;
633                     } else {
634                         pdp_new[i] = DNAN;
635                     }
636                     break;
637                 case DST_ABSOLUTE:
638                     errno = 0;
639                     pdp_new[i] = strtod(updvals[i + 1], &endptr);
640                     if (errno > 0) {
641                         rrd_set_error("converting '%s' to float: %s",
642                                       updvals[i + 1], rrd_strerror(errno));
643                         break;
644                     };
645                     if (endptr[0] != '\0') {
646                         rrd_set_error
647                             ("conversion of '%s' to float not complete: tail '%s'",
648                              updvals[i + 1], endptr);
649                         break;
650                     }
651                     rate = pdp_new[i] / interval;
652                     break;
653                 case DST_GAUGE:
654                     errno = 0;
655                     pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
656                     if (errno > 0) {
657                         rrd_set_error("converting '%s' to float: %s",
658                                       updvals[i + 1], rrd_strerror(errno));
659                         break;
660                     };
661                     if (endptr[0] != '\0') {
662                         rrd_set_error
663                             ("conversion of '%s' to float not complete: tail '%s'",
664                              updvals[i + 1], endptr);
665                         break;
666                     }
667                     rate = pdp_new[i] / interval;
668                     break;
669                 default:
670                     rrd_set_error("rrd contains unknown DS type : '%s'",
671                                   rrd.ds_def[i].dst);
672                     break;
673                 }
674                 /* break out of this for loop if the error string is set */
675                 if (rrd_test_error()) {
676                     break;
677                 }
678                 /* make sure pdp_temp is neither too large or too small
679                  * if any of these occur it becomes unknown ...
680                  * sorry folks ... */
681                 if (!isnan(rate) &&
682                     ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
683                       rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
684                      (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
685                       rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
686                     pdp_new[i] = DNAN;
687                 }
688             } else {
689                 /* no news is news all the same */
690                 pdp_new[i] = DNAN;
691             }
692
693
694             /* make a copy of the command line argument for the next run */
695 #ifdef DEBUG
696             fprintf(stderr,
697                     "prep ds[%lu]\t"
698                     "last_arg '%s'\t"
699                     "this_arg '%s'\t"
700                     "pdp_new %10.2f\n",
701                     i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
702 #endif
703             strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
704             rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
705         }
706         /* break out of the argument parsing loop if the error_string is set */
707         if (rrd_test_error()) {
708             free(step_start);
709             break;
710         }
711         /* has a pdp_st moment occurred since the last run ? */
712
713         if (proc_pdp_st == occu_pdp_st) {
714             /* no we have not passed a pdp_st moment. therefore update is simple */
715
716             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
717                 if (isnan(pdp_new[i])) {
718                     /* this is not realy accurate if we use subsecond data arival time
719                        should have thought of it when going subsecond resolution ...
720                        sorry next format change we will have it! */
721                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
722                         floor(interval);
723                 } else {
724                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
725                         rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
726                     } else {
727                         rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
728                     }
729                 }
730 #ifdef DEBUG
731                 fprintf(stderr,
732                         "NO PDP  ds[%lu]\t"
733                         "value %10.2f\t"
734                         "unkn_sec %5lu\n",
735                         i,
736                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
737                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
738 #endif
739             }
740         } else {
741             /* an pdp_st has occurred. */
742
743             /* in pdp_prep[].scratch[PDP_val].u_val we have collected
744                rate*seconds which occurred up to the last run.
745                pdp_new[] contains rate*seconds from the latest run.
746                pdp_temp[] will contain the rate for cdp */
747
748             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
749                 /* update pdp_prep to the current pdp_st. */
750                 double    pre_unknown = 0.0;
751
752                 if (isnan(pdp_new[i])) {
753                     /* a final bit of unkonwn to be added bevore calculation
754                        we use a temporary variable for this so that we
755                        don't have to turn integer lines before using the value */
756                     pre_unknown = pre_int;
757                 } else {
758                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
759                         rrd.pdp_prep[i].scratch[PDP_val].u_val =
760                             pdp_new[i] / interval * pre_int;
761                     } else {
762                         rrd.pdp_prep[i].scratch[PDP_val].u_val +=
763                             pdp_new[i] / interval * pre_int;
764                     }
765                 }
766
767
768                 /* if too much of the pdp_prep is unknown we dump it */
769                 if (
770                        /* removed because this does not agree with the
771                           definition that a heartbeat can be unknown */
772                        /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
773                           > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
774                        /* if the interval is larger thatn mrhb we get NAN */
775                        (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
776                        (occu_pdp_st - proc_pdp_st <=
777                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
778                     pdp_temp[i] = DNAN;
779                 } else {
780                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
781                         / ((double) (occu_pdp_st - proc_pdp_st
782                                      -
783                                      rrd.pdp_prep[i].
784                                      scratch[PDP_unkn_sec_cnt].u_cnt)
785                            - pre_unknown);
786                 }
787
788                 /* process CDEF data sources; remember each CDEF DS can
789                  * only reference other DS with a lower index number */
790                 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
791                     rpnp_t   *rpnp;
792
793                     rpnp =
794                         rpn_expand((rpn_cdefds_t *) &
795                                    (rrd.ds_def[i].par[DS_cdef]));
796                     /* substitue data values for OP_VARIABLE nodes */
797                     for (ii = 0; rpnp[ii].op != OP_END; ii++) {
798                         if (rpnp[ii].op == OP_VARIABLE) {
799                             rpnp[ii].op = OP_NUMBER;
800                             rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
801                         }
802                     }
803                     /* run the rpn calculator */
804                     if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
805                         free(rpnp);
806                         break;  /* exits the data sources pdp_temp loop */
807                     }
808                 }
809
810                 /* make pdp_prep ready for the next run */
811                 if (isnan(pdp_new[i])) {
812                     /* this is not realy accurate if we use subsecond data arival time
813                        should have thought of it when going subsecond resolution ...
814                        sorry next format change we will have it! */
815                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
816                         floor(post_int);
817                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
818                 } else {
819                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
820                     rrd.pdp_prep[i].scratch[PDP_val].u_val =
821                         pdp_new[i] / interval * post_int;
822                 }
823
824 #ifdef DEBUG
825                 fprintf(stderr,
826                         "PDP UPD ds[%lu]\t"
827                         "pdp_temp %10.2f\t"
828                         "new_prep %10.2f\t"
829                         "new_unkn_sec %5lu\n",
830                         i, pdp_temp[i],
831                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
832                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
833 #endif
834             }
835
836             /* if there were errors during the last loop, bail out here */
837             if (rrd_test_error()) {
838                 free(step_start);
839                 break;
840             }
841
842             /* compute the number of elapsed pdp_st moments */
843             elapsed_pdp_st =
844                 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
845 #ifdef DEBUG
846             fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
847 #endif
848             if (rra_step_cnt == NULL) {
849                 rra_step_cnt = (unsigned long *)
850                     malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
851             }
852
853             for (i = 0, rra_start = rra_begin;
854                  i < rrd.stat_head->rra_cnt;
855                  rra_start +=
856                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
857                  sizeof(rrd_value_t), i++) {
858                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
859                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
860                     (proc_pdp_st / rrd.stat_head->pdp_step) %
861                     rrd.rra_def[i].pdp_cnt;
862                 if (start_pdp_offset <= elapsed_pdp_st) {
863                     rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
864                         rrd.rra_def[i].pdp_cnt + 1;
865                 } else {
866                     rra_step_cnt[i] = 0;
867                 }
868
869                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
870                     /* If this is a bulk update, we need to skip ahead in
871                        the seasonal arrays so that they will be correct for
872                        the next observed value;
873                        note that for the bulk update itself, no update will
874                        occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
875                        and DEVPREDICT will be set to DNAN. */
876                     if (rra_step_cnt[i] > 2) {
877                         /* skip update by resetting rra_step_cnt[i],
878                            note that this is not data source specific; this is
879                            due to the bulk update, not a DNAN value for the
880                            specific data source. */
881                         rra_step_cnt[i] = 0;
882                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
883                                         elapsed_pdp_st, &last_seasonal_coef);
884                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
885                                         elapsed_pdp_st + 1, &seasonal_coef);
886                     }
887
888                     /* periodically run a smoother for seasonal effects */
889                     /* Need to use first cdp parameter buffer to track
890                      * burnin (burnin requires a specific smoothing schedule).
891                      * The CDP_init_seasonal parameter is really an RRA level,
892                      * not a data source within RRA level parameter, but the rra_def
893                      * is read only for rrd_update (not flushed to disk). */
894                     iii = i * (rrd.stat_head->ds_cnt);
895                     if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
896                         <= BURNIN_CYCLES) {
897                         if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
898                             > rrd.rra_def[i].row_cnt - 1) {
899                             /* mark off one of the burnin cycles */
900                             ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
901                                u_cnt);
902                             schedule_smooth = 1;
903                         }
904                     } else {
905                         /* someone has no doubt invented a trick to deal with this
906                          * wrap around, but at least this code is clear. */
907                         if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
908                             u_cnt > rrd.rra_ptr[i].cur_row) {
909                             /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
910                              * mapping between PDP and CDP */
911                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
912                                 >=
913                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
914                                 u_cnt) {
915 #ifdef DEBUG
916                                 fprintf(stderr,
917                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
918                                         rrd.rra_ptr[i].cur_row,
919                                         elapsed_pdp_st,
920                                         rrd.rra_def[i].
921                                         par[RRA_seasonal_smooth_idx].u_cnt);
922 #endif
923                                 schedule_smooth = 1;
924                             }
925                         } else {
926                             /* can't rely on negative numbers because we are working with
927                              * unsigned values */
928                             /* Don't need modulus here. If we've wrapped more than once, only
929                              * one smooth is executed at the end. */
930                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
931                                 rrd.rra_def[i].row_cnt
932                                 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
933                                 rrd.rra_def[i].row_cnt >=
934                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
935                                 u_cnt) {
936 #ifdef DEBUG
937                                 fprintf(stderr,
938                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
939                                         rrd.rra_ptr[i].cur_row,
940                                         elapsed_pdp_st,
941                                         rrd.rra_def[i].
942                                         par[RRA_seasonal_smooth_idx].u_cnt);
943 #endif
944                                 schedule_smooth = 1;
945                             }
946                         }
947                     }
948
949                     rra_current = rrd_tell(rrd_file);
950                 }
951                 /* if cf is DEVSEASONAL or SEASONAL */
952                 if (rrd_test_error())
953                     break;
954
955                 /* update CDP_PREP areas */
956                 /* loop over data soures within each RRA */
957                 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
958
959                     /* iii indexes the CDP prep area for this data source within the RRA */
960                     iii = i * rrd.stat_head->ds_cnt + ii;
961
962                     if (rrd.rra_def[i].pdp_cnt > 1) {
963
964                         if (rra_step_cnt[i] > 0) {
965                             /* If we are in this block, as least 1 CDP value will be written to
966                              * disk, this is the CDP_primary_val entry. If more than 1 value needs
967                              * to be written, then the "fill in" value is the CDP_secondary_val
968                              * entry. */
969                             if (isnan(pdp_temp[ii])) {
970                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
971                                     u_cnt += start_pdp_offset;
972                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
973                                     u_val = DNAN;
974                             } else {
975                                 /* CDP_secondary value is the RRA "fill in" value for intermediary
976                                  * CDP data entries. No matter the CF, the value is the same because
977                                  * the average, max, min, and last of a list of identical values is
978                                  * the same, namely, the value itself. */
979                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
980                                     u_val = pdp_temp[ii];
981                             }
982
983                             if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
984                                 u_cnt >
985                                 rrd.rra_def[i].pdp_cnt *
986                                 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
987                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
988                                     u_val = DNAN;
989                                 /* initialize carry over */
990                                 if (current_cf == CF_AVERAGE) {
991                                     if (isnan(pdp_temp[ii])) {
992                                         rrd.cdp_prep[iii].scratch[CDP_val].
993                                             u_val = DNAN;
994                                     } else {
995                                         rrd.cdp_prep[iii].scratch[CDP_val].
996                                             u_val =
997                                             pdp_temp[ii] *
998                                             ((elapsed_pdp_st -
999                                               start_pdp_offset) %
1000                                              rrd.rra_def[i].pdp_cnt);
1001                                     }
1002                                 } else {
1003                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1004                                         pdp_temp[ii];
1005                                 }
1006                             } else {
1007                                 rrd_value_t cum_val, cur_val;
1008
1009                                 switch (current_cf) {
1010                                 case CF_AVERAGE:
1011                                     cum_val =
1012                                         IFDNAN(rrd.cdp_prep[iii].
1013                                                scratch[CDP_val].u_val, 0.0);
1014                                     cur_val = IFDNAN(pdp_temp[ii], 0.0);
1015                                     rrd.cdp_prep[iii].
1016                                         scratch[CDP_primary_val].u_val =
1017                                         (cum_val +
1018                                          cur_val * start_pdp_offset) /
1019                                         (rrd.rra_def[i].pdp_cnt -
1020                                          rrd.cdp_prep[iii].
1021                                          scratch[CDP_unkn_pdp_cnt].u_cnt);
1022                                     /* initialize carry over value */
1023                                     if (isnan(pdp_temp[ii])) {
1024                                         rrd.cdp_prep[iii].scratch[CDP_val].
1025                                             u_val = DNAN;
1026                                     } else {
1027                                         rrd.cdp_prep[iii].scratch[CDP_val].
1028                                             u_val =
1029                                             pdp_temp[ii] *
1030                                             ((elapsed_pdp_st -
1031                                               start_pdp_offset) %
1032                                              rrd.rra_def[i].pdp_cnt);
1033                                     }
1034                                     break;
1035                                 case CF_MAXIMUM:
1036                                     cum_val =
1037                                         IFDNAN(rrd.cdp_prep[iii].
1038                                                scratch[CDP_val].u_val, -DINF);
1039                                     cur_val = IFDNAN(pdp_temp[ii], -DINF);
1040 #ifdef DEBUG
1041                                     if (isnan
1042                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1043                                          u_val) && isnan(pdp_temp[ii])) {
1044                                         fprintf(stderr,
1045                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1046                                                 i, ii);
1047                                         exit(-1);
1048                                     }
1049 #endif
1050                                     if (cur_val > cum_val)
1051                                         rrd.cdp_prep[iii].
1052                                             scratch[CDP_primary_val].u_val =
1053                                             cur_val;
1054                                     else
1055                                         rrd.cdp_prep[iii].
1056                                             scratch[CDP_primary_val].u_val =
1057                                             cum_val;
1058                                     /* initialize carry over value */
1059                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1060                                         pdp_temp[ii];
1061                                     break;
1062                                 case CF_MINIMUM:
1063                                     cum_val =
1064                                         IFDNAN(rrd.cdp_prep[iii].
1065                                                scratch[CDP_val].u_val, DINF);
1066                                     cur_val = IFDNAN(pdp_temp[ii], DINF);
1067 #ifdef DEBUG
1068                                     if (isnan
1069                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1070                                          u_val) && isnan(pdp_temp[ii])) {
1071                                         fprintf(stderr,
1072                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1073                                                 i, ii);
1074                                         exit(-1);
1075                                     }
1076 #endif
1077                                     if (cur_val < cum_val)
1078                                         rrd.cdp_prep[iii].
1079                                             scratch[CDP_primary_val].u_val =
1080                                             cur_val;
1081                                     else
1082                                         rrd.cdp_prep[iii].
1083                                             scratch[CDP_primary_val].u_val =
1084                                             cum_val;
1085                                     /* initialize carry over value */
1086                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1087                                         pdp_temp[ii];
1088                                     break;
1089                                 case CF_LAST:
1090                                 default:
1091                                     rrd.cdp_prep[iii].
1092                                         scratch[CDP_primary_val].u_val =
1093                                         pdp_temp[ii];
1094                                     /* initialize carry over value */
1095                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1096                                         pdp_temp[ii];
1097                                     break;
1098                                 }
1099                             }   /* endif meets xff value requirement for a valid value */
1100                             /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1101                              * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1102                             if (isnan(pdp_temp[ii]))
1103                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1104                                     u_cnt =
1105                                     (elapsed_pdp_st -
1106                                      start_pdp_offset) %
1107                                     rrd.rra_def[i].pdp_cnt;
1108                             else
1109                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1110                                     u_cnt = 0;
1111                         } else {    /* rra_step_cnt[i]  == 0 */
1112
1113 #ifdef DEBUG
1114                             if (isnan
1115                                 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1116                                 fprintf(stderr,
1117                                         "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1118                                         i, ii);
1119                             } else {
1120                                 fprintf(stderr,
1121                                         "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1122                                         i, ii,
1123                                         rrd.cdp_prep[iii].scratch[CDP_val].
1124                                         u_val);
1125                             }
1126 #endif
1127                             if (isnan(pdp_temp[ii])) {
1128                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1129                                     u_cnt += elapsed_pdp_st;
1130                             } else
1131                                 if (isnan
1132                                     (rrd.cdp_prep[iii].scratch[CDP_val].
1133                                      u_val)) {
1134                                 if (current_cf == CF_AVERAGE) {
1135                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1136                                         pdp_temp[ii] * elapsed_pdp_st;
1137                                 } else {
1138                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1139                                         pdp_temp[ii];
1140                                 }
1141 #ifdef DEBUG
1142                                 fprintf(stderr,
1143                                         "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1144                                         i, ii,
1145                                         rrd.cdp_prep[iii].scratch[CDP_val].
1146                                         u_val);
1147 #endif
1148                             } else {
1149                                 switch (current_cf) {
1150                                 case CF_AVERAGE:
1151                                     rrd.cdp_prep[iii].scratch[CDP_val].
1152                                         u_val +=
1153                                         pdp_temp[ii] * elapsed_pdp_st;
1154                                     break;
1155                                 case CF_MINIMUM:
1156                                     if (pdp_temp[ii] <
1157                                         rrd.cdp_prep[iii].scratch[CDP_val].
1158                                         u_val)
1159                                         rrd.cdp_prep[iii].scratch[CDP_val].
1160                                             u_val = pdp_temp[ii];
1161                                     break;
1162                                 case CF_MAXIMUM:
1163                                     if (pdp_temp[ii] >
1164                                         rrd.cdp_prep[iii].scratch[CDP_val].
1165                                         u_val)
1166                                         rrd.cdp_prep[iii].scratch[CDP_val].
1167                                             u_val = pdp_temp[ii];
1168                                     break;
1169                                 case CF_LAST:
1170                                 default:
1171                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1172                                         pdp_temp[ii];
1173                                     break;
1174                                 }
1175                             }
1176                         }
1177                     } else {    /* rrd.rra_def[i].pdp_cnt == 1 */
1178                         if (elapsed_pdp_st > 2) {
1179                             switch (current_cf) {
1180                             case CF_AVERAGE:
1181                             default:
1182                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1183                                     u_val = pdp_temp[ii];
1184                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1185                                     u_val = pdp_temp[ii];
1186                                 break;
1187                             case CF_SEASONAL:
1188                             case CF_DEVSEASONAL:
1189                                 /* need to update cached seasonal values, so they are consistent
1190                                  * with the bulk update */
1191                                 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1192                                  * CDP_last_deviation are the same. */
1193                                 rrd.cdp_prep[iii].
1194                                     scratch[CDP_hw_last_seasonal].u_val =
1195                                     last_seasonal_coef[ii];
1196                                 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1197                                     u_val = seasonal_coef[ii];
1198                                 break;
1199                             case CF_HWPREDICT:
1200                                 /* need to update the null_count and last_null_count.
1201                                  * even do this for non-DNAN pdp_temp because the
1202                                  * algorithm is not learning from batch updates. */
1203                                 rrd.cdp_prep[iii].scratch[CDP_null_count].
1204                                     u_cnt += elapsed_pdp_st;
1205                                 rrd.cdp_prep[iii].
1206                                     scratch[CDP_last_null_count].u_cnt +=
1207                                     elapsed_pdp_st - 1;
1208                                 /* fall through */
1209                             case CF_DEVPREDICT:
1210                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1211                                     u_val = DNAN;
1212                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1213                                     u_val = DNAN;
1214                                 break;
1215                             case CF_FAILURES:
1216                                 /* do not count missed bulk values as failures */
1217                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1218                                     u_val = 0;
1219                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1220                                     u_val = 0;
1221                                 /* need to reset violations buffer.
1222                                  * could do this more carefully, but for now, just
1223                                  * assume a bulk update wipes away all violations. */
1224                                 erase_violations(&rrd, iii, i);
1225                                 break;
1226                             }
1227                         }
1228                     }   /* endif rrd.rra_def[i].pdp_cnt == 1 */
1229
1230                     if (rrd_test_error())
1231                         break;
1232
1233                 }       /* endif data sources loop */
1234             }           /* end RRA Loop */
1235
1236             /* this loop is only entered if elapsed_pdp_st < 3 */
1237             for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1238                  j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1239                 for (i = 0, rra_start = rra_begin;
1240                      i < rrd.stat_head->rra_cnt;
1241                      rra_start +=
1242                      rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1243                      sizeof(rrd_value_t), i++) {
1244                     if (rrd.rra_def[i].pdp_cnt > 1)
1245                         continue;
1246
1247                     current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1248                     if (current_cf == CF_SEASONAL
1249                         || current_cf == CF_DEVSEASONAL) {
1250                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
1251                                         elapsed_pdp_st + (scratch_idx ==
1252                                                           CDP_primary_val ? 1
1253                                                           : 2),
1254                                         &seasonal_coef);
1255                         rra_current = rrd_tell(rrd_file);
1256                     }
1257                     if (rrd_test_error())
1258                         break;
1259                     /* loop over data soures within each RRA */
1260                     for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1261                         update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1262                                            i * (rrd.stat_head->ds_cnt) + ii,
1263                                            i, ii, scratch_idx, seasonal_coef);
1264                     }
1265                 }       /* end RRA Loop */
1266                 if (rrd_test_error())
1267                     break;
1268             }           /* end elapsed_pdp_st loop */
1269
1270             if (rrd_test_error())
1271                 break;
1272
1273             /* Ready to write to disk */
1274             /* Move sequentially through the file, writing one RRA at a time.
1275              * Note this architecture divorces the computation of CDP with
1276              * flushing updated RRA entries to disk. */
1277             for (i = 0, rra_start = rra_begin;
1278                  i < rrd.stat_head->rra_cnt;
1279                  rra_start +=
1280                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1281                  sizeof(rrd_value_t), i++) {
1282                 /* is th5Aere anything to write for this RRA? If not, continue. */
1283                 if (rra_step_cnt[i] == 0)
1284                     continue;
1285
1286                 /* write the first row */
1287 #ifdef DEBUG
1288                 fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1289 #endif
1290                 rrd.rra_ptr[i].cur_row++;
1291                 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1292                     rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1293                 /* positition on the first row */
1294                 rra_pos_tmp = rra_start +
1295                     (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1296                     sizeof(rrd_value_t);
1297                 if (rra_pos_tmp != rra_current) {
1298                     if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1299                         rrd_set_error("seek error in rrd");
1300                         break;
1301                     }
1302                     rra_current = rra_pos_tmp;
1303                 }
1304 #ifdef DEBUG
1305                 fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1306 #endif
1307                 scratch_idx = CDP_primary_val;
1308                 if (pcdp_summary != NULL) {
1309                     rra_time = (current_time - current_time
1310                                 % (rrd.rra_def[i].pdp_cnt *
1311                                    rrd.stat_head->pdp_step))
1312                         -
1313                         ((rra_step_cnt[i] -
1314                           1) * rrd.rra_def[i].pdp_cnt *
1315                          rrd.stat_head->pdp_step);
1316                 }
1317                 pcdp_summary =
1318                     write_RRA_row(rrd_file, &rrd, i, &rra_current,
1319                                   scratch_idx, pcdp_summary, &rra_time);
1320                 if (rrd_test_error())
1321                     break;
1322
1323                 /* write other rows of the bulk update, if any */
1324                 scratch_idx = CDP_secondary_val;
1325                 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1326                     if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1327 #ifdef DEBUG
1328                         fprintf(stderr,
1329                                 "Wraparound for RRA %s, %lu updates left\n",
1330                                 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1331 #endif
1332                         /* wrap */
1333                         rrd.rra_ptr[i].cur_row = 0;
1334                         /* seek back to beginning of current rra */
1335                         if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1336                             rrd_set_error("seek error in rrd");
1337                             break;
1338                         }
1339 #ifdef DEBUG
1340                         fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1341                                 rrd_file->pos);
1342 #endif
1343                         rra_current = rra_start;
1344                     }
1345                     if (pcdp_summary != NULL) {
1346                         rra_time = (current_time - current_time
1347                                     % (rrd.rra_def[i].pdp_cnt *
1348                                        rrd.stat_head->pdp_step))
1349                             -
1350                             ((rra_step_cnt[i] -
1351                               2) * rrd.rra_def[i].pdp_cnt *
1352                              rrd.stat_head->pdp_step);
1353                     }
1354                     pcdp_summary =
1355                         write_RRA_row(rrd_file, &rrd, i, &rra_current,
1356                                       scratch_idx, pcdp_summary, &rra_time);
1357                 }
1358
1359                 if (rrd_test_error())
1360                     break;
1361             }           /* RRA LOOP */
1362
1363             /* break out of the argument parsing loop if error_string is set */
1364             if (rrd_test_error()) {
1365                 free(step_start);
1366                 break;
1367             }
1368
1369         }               /* endif a pdp_st has occurred */
1370         rrd.live_head->last_up = current_time;
1371         rrd.live_head->last_up_usec = current_time_usec;
1372         free(step_start);
1373     }                   /* function argument loop */
1374
1375     if (seasonal_coef != NULL)
1376         free(seasonal_coef);
1377     if (last_seasonal_coef != NULL)
1378         free(last_seasonal_coef);
1379     if (rra_step_cnt != NULL)
1380         free(rra_step_cnt);
1381     rpnstack_free(&rpnstack);
1382
1383 #if 0
1384     //rrd_flush(rrd_file);    //XXX: really needed?
1385 #endif
1386     /* if we got here and if there is an error and if the file has not been
1387      * written to, then close things up and return. */
1388     if (rrd_test_error()) {
1389         goto err_free_pdp_new;
1390     }
1391
1392     /* aargh ... that was tough ... so many loops ... anyway, its done.
1393      * we just need to write back the live header portion now*/
1394
1395     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1396                             + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1397                             + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1398                  SEEK_SET) != 0) {
1399         rrd_set_error("seek rrd for live header writeback");
1400         goto err_free_pdp_new;
1401     }
1402     /* for mmap, we did already write to the underlying mapping, so we do
1403        not need to write again.  */
1404 #ifndef HAVE_MMAP
1405     if (version >= 3) {
1406         if (rrd_write(rrd_file, rrd.live_head,
1407                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1408             rrd_set_error("rrd_write live_head to rrd");
1409             goto err_free_pdp_new;
1410         }
1411     } else {
1412         if (rrd_write(rrd_file, &rrd.live_head->last_up,
1413                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1414             rrd_set_error("rrd_write live_head to rrd");
1415             goto err_free_pdp_new;
1416         }
1417     }
1418
1419
1420     if (rrd_write(rrd_file, rrd.pdp_prep,
1421                   sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1422         != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1423         rrd_set_error("rrd_write pdp_prep to rrd");
1424         goto err_free_pdp_new;
1425     }
1426
1427     if (rrd_write(rrd_file, rrd.cdp_prep,
1428                   sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1429                   rrd.stat_head->ds_cnt)
1430         != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1431                       rrd.stat_head->ds_cnt)) {
1432
1433         rrd_set_error("rrd_write cdp_prep to rrd");
1434         goto err_free_pdp_new;
1435     }
1436
1437     if (rrd_write(rrd_file, rrd.rra_ptr,
1438                   sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1439         != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1440         rrd_set_error("rrd_write rra_ptr to rrd");
1441         goto err_free_pdp_new;
1442     }
1443 #endif
1444 #ifdef HAVE_POSIX_FADVISExxx
1445
1446     /* with update we have write ops, so they will probably not be done by now, this means
1447        the buffers will not get freed. But calling this for the whole file - header
1448        will let the data off the hook as soon as it is written when if it is from a previous
1449        update cycle. Calling fdsync to force things is much too hard here. */
1450
1451     if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1452         rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1453                       rrd_strerror(errno));
1454         goto err_free_pdp_new;
1455     }
1456 #endif
1457     /* rrd_flush(rrd_file); */
1458
1459     /* calling the smoothing code here guarantees at most
1460      * one smoothing operation per rrd_update call. Unfortunately,
1461      * it is possible with bulk updates, or a long-delayed update
1462      * for smoothing to occur off-schedule. This really isn't
1463      * critical except during the burning cycles. */
1464     if (schedule_smooth) {
1465
1466         rra_start = rra_begin;
1467         for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1468             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1469                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1470 #ifdef DEBUG
1471                 fprintf(stderr, "Running smoother for rra %ld\n", i);
1472 #endif
1473                 apply_smoother(&rrd, i, rra_start, rrd_file);
1474                 if (rrd_test_error())
1475                     break;
1476             }
1477             rra_start += rrd.rra_def[i].row_cnt
1478                 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1479         }
1480 #ifdef HAVE_POSIX_FADVISExxx
1481         /* same procedure as above ... */
1482         if (0 !=
1483             posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1484             rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1485                           rrd_strerror(errno));
1486             goto err_free_pdp_new;
1487         }
1488 #endif
1489     }
1490
1491     rrd_free(&rrd);
1492     rrd_close(rrd_file);
1493
1494     free(pdp_new);
1495     free(tmpl_idx);
1496     free(pdp_temp);
1497     free(updvals);
1498     return (0);
1499
1500   err_free_pdp_new:
1501     free(pdp_new);
1502   err_free_tmpl_idx:
1503     free(tmpl_idx);
1504   err_free_pdp_temp:
1505     free(pdp_temp);
1506   err_free_updvals:
1507     free(updvals);
1508   err_close:
1509     rrd_close(rrd_file);
1510   err_free:
1511     rrd_free(&rrd);
1512   err_out:
1513     return (-1);
1514 }
1515
1516 /*
1517  * get exclusive lock to whole file.
1518  * lock gets removed when we close the file
1519  *
1520  * returns 0 on success
1521  */
1522 int LockRRD(
1523     int in_file)
1524 {
1525     int       rcstat;
1526
1527     {
1528 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1529         struct _stat st;
1530
1531         if (_fstat(in_file, &st) == 0) {
1532             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1533         } else {
1534             rcstat = -1;
1535         }
1536 #else
1537         struct flock lock;
1538
1539         lock.l_type = F_WRLCK;  /* exclusive write lock */
1540         lock.l_len = 0; /* whole file */
1541         lock.l_start = 0;   /* start of file */
1542         lock.l_whence = SEEK_SET;   /* end of file */
1543
1544         rcstat = fcntl(in_file, F_SETLK, &lock);
1545 #endif
1546     }
1547
1548     return (rcstat);
1549 }