a34da71574cdd3f5f73061262bb8f67d37860496
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 # include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37     time_t    tv_sec;   /* seconds */
38     long      tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43     int       tz_minuteswest;   /* minutes W of Greenwich */
44     int       tz_dsttime;   /* type of dst correction */
45 };
46
47 static int gettimeofday(
48     struct timeval *t,
49     struct __timezone *tz)
50 {
51
52     struct _timeb current_time;
53
54     _ftime(&current_time);
55
56     t->tv_sec = current_time.time;
57     t->tv_usec = current_time.millitm * 1000;
58
59     return 0;
60 }
61
62 #endif
63 /*
64  * normilize time as returned by gettimeofday. usec part must
65  * be always >= 0
66  */
67 static void normalize_time(
68     struct timeval *t)
69 {
70     if (t->tv_usec < 0) {
71         t->tv_sec--;
72         t->tv_usec += 1000000L;
73     }
74 }
75
76 /* Local prototypes */
77 int       LockRRD(
78     int in_file);
79
80 #ifdef HAVE_MMAP
81 info_t   *write_RRA_row(
82     rrd_t *rrd,
83     unsigned long rra_idx,
84     unsigned long *rra_current,
85     unsigned short CDP_scratch_idx,
86 #ifndef DEBUG
87     int UNUSED(in_file),
88 #else
89     int in_file,
90 #endif
91     info_t *pcdp_summary,
92     time_t *rra_time,
93     void *rrd_mmaped_file);
94 #else
95 info_t   *write_RRA_row(
96     rrd_t *rrd,
97     unsigned long rra_idx,
98     unsigned long *rra_current,
99     unsigned short CDP_scratch_idx,
100     int in_file,
101     info_t *pcdp_summary,
102     time_t *rra_time);
103 #endif
104 int       rrd_update_r(
105     const char *filename,
106     const char *tmplt,
107     int argc,
108     const char **argv);
109 int       _rrd_update(
110     const char *filename,
111     const char *tmplt,
112     int argc,
113     const char **argv,
114     info_t *);
115
116 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
117
118
119 info_t   *rrd_update_v(
120     int argc,
121     char **argv)
122 {
123     char     *tmplt = NULL;
124     info_t   *result = NULL;
125     infoval   rc;
126
127     rc.u_int = -1;
128     optind = 0;
129     opterr = 0;         /* initialize getopt */
130
131     while (1) {
132         static struct option long_options[] = {
133             {"template", required_argument, 0, 't'},
134             {0, 0, 0, 0}
135         };
136         int       option_index = 0;
137         int       opt;
138
139         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
140
141         if (opt == EOF)
142             break;
143
144         switch (opt) {
145         case 't':
146             tmplt = optarg;
147             break;
148
149         case '?':
150             rrd_set_error("unknown option '%s'", argv[optind - 1]);
151             goto end_tag;
152         }
153     }
154
155     /* need at least 2 arguments: filename, data. */
156     if (argc - optind < 2) {
157         rrd_set_error("Not enough arguments");
158         goto end_tag;
159     }
160     rc.u_int = 0;
161     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
162     rc.u_int = _rrd_update(argv[optind], tmplt,
163                            argc - optind - 1,
164                            (const char **) (argv + optind + 1), result);
165     result->value.u_int = rc.u_int;
166   end_tag:
167     return result;
168 }
169
170 int rrd_update(
171     int argc,
172     char **argv)
173 {
174     char     *tmplt = NULL;
175     int       rc;
176
177     optind = 0;
178     opterr = 0;         /* initialize getopt */
179
180     while (1) {
181         static struct option long_options[] = {
182             {"template", required_argument, 0, 't'},
183             {0, 0, 0, 0}
184         };
185         int       option_index = 0;
186         int       opt;
187
188         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
189
190         if (opt == EOF)
191             break;
192
193         switch (opt) {
194         case 't':
195             tmplt = optarg;
196             break;
197
198         case '?':
199             rrd_set_error("unknown option '%s'", argv[optind - 1]);
200             return (-1);
201         }
202     }
203
204     /* need at least 2 arguments: filename, data. */
205     if (argc - optind < 2) {
206         rrd_set_error("Not enough arguments");
207
208         return -1;
209     }
210
211     rc = rrd_update_r(argv[optind], tmplt,
212                       argc - optind - 1, (const char **) (argv + optind + 1));
213     return rc;
214 }
215
216 int rrd_update_r(
217     const char *filename,
218     const char *tmplt,
219     int argc,
220     const char **argv)
221 {
222     return _rrd_update(filename, tmplt, argc, argv, NULL);
223 }
224
225 int _rrd_update(
226     const char *filename,
227     const char *tmplt,
228     int argc,
229     const char **argv,
230     info_t *pcdp_summary)
231 {
232
233     int       arg_i = 2;
234     short     j;
235     unsigned long i, ii, iii = 1;
236
237     unsigned long rra_begin;    /* byte pointer to the rra
238                                  * area in the rrd file.  this
239                                  * pointer never changes value */
240     unsigned long rra_start;    /* byte pointer to the rra
241                                  * area in the rrd file.  this
242                                  * pointer changes as each rrd is
243                                  * processed. */
244     unsigned long rra_current;  /* byte pointer to the current write
245                                  * spot in the rrd file. */
246     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
247     double    interval, pre_int, post_int;  /* interval between this and
248                                              * the last run */
249     unsigned long proc_pdp_st;  /* which pdp_st was the last
250                                  * to be processed */
251     unsigned long occu_pdp_st;  /* when was the pdp_st
252                                  * before the last update
253                                  * time */
254     unsigned long proc_pdp_age; /* how old was the data in
255                                  * the pdp prep area when it
256                                  * was last updated */
257     unsigned long occu_pdp_age; /* how long ago was the last
258                                  * pdp_step time */
259     rrd_value_t *pdp_new;   /* prepare the incoming data
260                              * to be added the the
261                              * existing entry */
262     rrd_value_t *pdp_temp;  /* prepare the pdp values 
263                              * to be added the the
264                              * cdp values */
265
266     long     *tmpl_idx; /* index representing the settings
267                            transported by the tmplt index */
268     unsigned long tmpl_cnt = 2; /* time and data */
269
270     rrd_t     rrd;
271     time_t    current_time = 0;
272     time_t    rra_time = 0; /* time of update for a RRA */
273     unsigned long current_time_usec = 0;    /* microseconds part of current time */
274     struct timeval tmp_time;    /* used for time conversion */
275
276     char    **updvals;
277     int       schedule_smooth = 0;
278     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
279
280     /* a vector of future Holt-Winters seasonal coefs */
281     unsigned long elapsed_pdp_st;
282
283     /* number of elapsed PDP steps since last update */
284     unsigned long *rra_step_cnt = NULL;
285
286     /* number of rows to be updated in an RRA for a data
287      * value. */
288     unsigned long start_pdp_offset;
289
290     /* number of PDP steps since the last update that
291      * are assigned to the first CDP to be generated
292      * since the last update. */
293     unsigned short scratch_idx;
294
295     /* index into the CDP scratch array */
296     enum cf_en current_cf;
297
298     /* numeric id of the current consolidation function */
299     rpnstack_t rpnstack;    /* used for COMPUTE DS */
300     int       version;  /* rrd version */
301     char     *endptr;   /* used in the conversion */
302     rrd_file_t *rrd_file;
303
304     rpnstack_init(&rpnstack);
305
306     /* need at least 1 arguments: data. */
307     if (argc < 1) {
308         rrd_set_error("Not enough arguments");
309         return -1;
310     }
311
312     rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
313     if (rrd_file == NULL) {
314         return -1;
315     }
316
317     /* initialize time */
318     version = atoi(rrd.stat_head->version);
319     gettimeofday(&tmp_time, 0);
320     normalize_time(&tmp_time);
321     current_time = tmp_time.tv_sec;
322     if (version >= 3) {
323         current_time_usec = tmp_time.tv_usec;
324     } else {
325         current_time_usec = 0;
326     }
327
328     rra_current = rra_start = rra_begin = rrd_file->header_len;
329     /* This is defined in the ANSI C standard, section 7.9.5.3:
330
331        When a file is opened with udpate mode ('+' as the second
332        or third character in the ... list of mode argument
333        variables), both input and output may be performed on the
334        associated stream.  However, ...  input may not be directly
335        followed by output without an intervening call to a file
336        positioning function, unless the input operation encounters
337        end-of-file. */
338 #if 0                   //def HAVE_MMAP
339     rrd_filesize = rrd_file->file_size;
340     fseek(rrd_file->fd, 0, SEEK_END);
341     rrd_filesize = ftell(rrd_file->fd);
342     fseek(rrd_file->fd, rra_current, SEEK_SET);
343 #else
344 //    fseek(rrd_file->fd, 0, SEEK_CUR);
345 #endif
346
347
348     /* get exclusive lock to whole file.
349      * lock gets removed when we close the file.
350      */
351     if (LockRRD(rrd_file->fd) != 0) {
352         rrd_set_error("could not lock RRD");
353         rrd_free(&rrd);
354         close(rrd_file->fd);
355         return (-1);
356     }
357
358     if ((updvals =
359          malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
360         rrd_set_error("allocating updvals pointer array");
361         rrd_free(&rrd);
362         close(rrd_file->fd);
363         return (-1);
364     }
365
366     if ((pdp_temp = malloc(sizeof(rrd_value_t)
367                            * rrd.stat_head->ds_cnt)) == NULL) {
368         rrd_set_error("allocating pdp_temp ...");
369         free(updvals);
370         rrd_free(&rrd);
371         close(rrd_file->fd);
372         return (-1);
373     }
374
375     if ((tmpl_idx = malloc(sizeof(unsigned long)
376                            * (rrd.stat_head->ds_cnt + 1))) == NULL) {
377         rrd_set_error("allocating tmpl_idx ...");
378         free(pdp_temp);
379         free(updvals);
380         rrd_free(&rrd);
381         close(rrd_file->fd);
382         return (-1);
383     }
384     /* initialize tmplt redirector */
385     /* default config example (assume DS 1 is a CDEF DS)
386        tmpl_idx[0] -> 0; (time)
387        tmpl_idx[1] -> 1; (DS 0)
388        tmpl_idx[2] -> 3; (DS 2)
389        tmpl_idx[3] -> 4; (DS 3) */
390     tmpl_idx[0] = 0;    /* time */
391     for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
392         if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
393             tmpl_idx[ii++] = i;
394     }
395     tmpl_cnt = ii;
396
397     if (tmplt) {
398         /* we should work on a writeable copy here */
399         char     *dsname;
400         unsigned int tmpl_len;
401         char     *tmplt_copy = strdup(tmplt);
402
403         dsname = tmplt_copy;
404         tmpl_cnt = 1;   /* the first entry is the time */
405         tmpl_len = strlen(tmplt_copy);
406         for (i = 0; i <= tmpl_len; i++) {
407             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
408                 tmplt_copy[i] = '\0';
409                 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
410                     rrd_set_error
411                         ("tmplt contains more DS definitions than RRD");
412                     free(updvals);
413                     free(pdp_temp);
414                     free(tmpl_idx);
415                     rrd_free(&rrd);
416                     close(rrd_file->fd);
417                     return (-1);
418                 }
419                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
420                     rrd_set_error("unknown DS name '%s'", dsname);
421                     free(updvals);
422                     free(pdp_temp);
423                     free(tmplt_copy);
424                     free(tmpl_idx);
425                     rrd_free(&rrd);
426                     close(rrd_file->fd);
427                     return (-1);
428                 } else {
429                     /* the first element is always the time */
430                     tmpl_idx[tmpl_cnt - 1]++;
431                     /* go to the next entry on the tmplt_copy */
432                     dsname = &tmplt_copy[i + 1];
433                     /* fix the damage we did before */
434                     if (i < tmpl_len) {
435                         tmplt_copy[i] = ':';
436                     }
437
438                 }
439             }
440         }
441         free(tmplt_copy);
442     }
443     if ((pdp_new = malloc(sizeof(rrd_value_t)
444                           * rrd.stat_head->ds_cnt)) == NULL) {
445         rrd_set_error("allocating pdp_new ...");
446         free(updvals);
447         free(pdp_temp);
448         free(tmpl_idx);
449         rrd_free(&rrd);
450         close(rrd_file->fd);
451         return (-1);
452     }
453 #if 0                   //def HAVE_MMAP
454     rrd_mmaped_file = mmap(0,
455                            rrd_file->file_len,
456                            PROT_READ | PROT_WRITE,
457                            MAP_SHARED, fileno(in_file), 0);
458     if (rrd_mmaped_file == MAP_FAILED) {
459         rrd_set_error("error mmapping file %s", filename);
460         free(updvals);
461         free(pdp_temp);
462         free(tmpl_idx);
463         rrd_free(&rrd);
464         close(rrd_file->fd);
465         return (-1);
466     }
467 #ifdef USE_MADVISE
468     /* when we use mmaping we tell the kernel the mmap equivalent
469        of POSIX_FADV_RANDOM */
470     madvise(rrd_mmaped_file, rrd_filesize, POSIX_MADV_RANDOM);
471 #endif
472 #endif
473     /* loop through the arguments. */
474     for (arg_i = 0; arg_i < argc; arg_i++) {
475         char     *stepper = strdup(argv[arg_i]);
476         char     *step_start = stepper;
477         char     *p;
478         char     *parsetime_error = NULL;
479         enum { atstyle, normal } timesyntax;
480         struct rrd_time_value ds_tv;
481
482         if (stepper == NULL) {
483             rrd_set_error("failed duplication argv entry");
484             free(step_start);
485             free(updvals);
486             free(pdp_temp);
487             free(tmpl_idx);
488             rrd_free(&rrd);
489 #ifdef HAVE_MMAP
490             rrd_close(rrd_file);
491 #endif
492             close(rrd_file->fd);
493             return (-1);
494         }
495         /* initialize all ds input to unknown except the first one
496            which has always got to be set */
497         for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
498             updvals[ii] = "U";
499         updvals[0] = stepper;
500         /* separate all ds elements; first must be examined separately
501            due to alternate time syntax */
502         if ((p = strchr(stepper, '@')) != NULL) {
503             timesyntax = atstyle;
504             *p = '\0';
505             stepper = p + 1;
506         } else if ((p = strchr(stepper, ':')) != NULL) {
507             timesyntax = normal;
508             *p = '\0';
509             stepper = p + 1;
510         } else {
511             rrd_set_error
512                 ("expected timestamp not found in data source from %s",
513                  argv[arg_i]);
514             free(step_start);
515             break;
516         }
517         ii = 1;
518         updvals[tmpl_idx[ii]] = stepper;
519         while (*stepper) {
520             if (*stepper == ':') {
521                 *stepper = '\0';
522                 ii++;
523                 if (ii < tmpl_cnt) {
524                     updvals[tmpl_idx[ii]] = stepper + 1;
525                 }
526             }
527             stepper++;
528         }
529
530         if (ii != tmpl_cnt - 1) {
531             rrd_set_error
532                 ("expected %lu data source readings (got %lu) from %s",
533                  tmpl_cnt - 1, ii, argv[arg_i]);
534             free(step_start);
535             break;
536         }
537
538         /* get the time from the reading ... handle N */
539         if (timesyntax == atstyle) {
540             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
541                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
542                 free(step_start);
543                 break;
544             }
545             if (ds_tv.type == RELATIVE_TO_END_TIME ||
546                 ds_tv.type == RELATIVE_TO_START_TIME) {
547                 rrd_set_error("specifying time relative to the 'start' "
548                               "or 'end' makes no sense here: %s", updvals[0]);
549                 free(step_start);
550                 break;
551             }
552
553             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
554
555             current_time_usec = 0;  /* FIXME: how to handle usecs here ? */
556
557         } else if (strcmp(updvals[0], "N") == 0) {
558             gettimeofday(&tmp_time, 0);
559             normalize_time(&tmp_time);
560             current_time = tmp_time.tv_sec;
561             current_time_usec = tmp_time.tv_usec;
562         } else {
563             double    tmp;
564
565             tmp = strtod(updvals[0], 0);
566             current_time = floor(tmp);
567             current_time_usec =
568                 (long) ((tmp - (double) current_time) * 1000000.0);
569         }
570         /* dont do any correction for old version RRDs */
571         if (version < 3)
572             current_time_usec = 0;
573
574         if (current_time < rrd.live_head->last_up ||
575             (current_time == rrd.live_head->last_up &&
576              (long) current_time_usec <=
577              (long) rrd.live_head->last_up_usec)) {
578             rrd_set_error("illegal attempt to update using time %ld when "
579                           "last update time is %ld (minimum one second step)",
580                           current_time, rrd.live_head->last_up);
581             free(step_start);
582             break;
583         }
584
585
586         /* seek to the beginning of the rra's */
587         if (rra_current != rra_begin) {
588 #ifndef HAVE_MMAP
589             if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
590                 rrd_set_error("seek error in rrd");
591                 free(step_start);
592                 break;
593             }
594 #endif
595             rra_current = rra_begin;
596         }
597         rra_start = rra_begin;
598
599         /* when was the current pdp started */
600         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
601         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
602
603         /* when did the last pdp_st occur */
604         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
605         occu_pdp_st = current_time - occu_pdp_age;
606
607         /* interval = current_time - rrd.live_head->last_up; */
608         interval = (double) (current_time - rrd.live_head->last_up)
609             + (double) ((long) current_time_usec -
610                         (long) rrd.live_head->last_up_usec) / 1000000.0;
611
612         if (occu_pdp_st > proc_pdp_st) {
613             /* OK we passed the pdp_st moment */
614             pre_int = (long) occu_pdp_st - rrd.live_head->last_up;  /* how much of the input data
615                                                                      * occurred before the latest
616                                                                      * pdp_st moment*/
617             pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0;  /* adjust usecs */
618             post_int = occu_pdp_age;    /* how much after it */
619             post_int += ((double) current_time_usec) / 1000000.0;   /* adjust usecs */
620         } else {
621             pre_int = interval;
622             post_int = 0;
623         }
624
625 #ifdef DEBUG
626         printf("proc_pdp_age %lu\t"
627                "proc_pdp_st %lu\t"
628                "occu_pfp_age %lu\t"
629                "occu_pdp_st %lu\t"
630                "int %lf\t"
631                "pre_int %lf\t"
632                "post_int %lf\n", proc_pdp_age, proc_pdp_st,
633                occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
634 #endif
635
636         /* process the data sources and update the pdp_prep 
637          * area accordingly */
638         for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
639             enum dst_en dst_idx;
640
641             dst_idx = dst_conv(rrd.ds_def[i].dst);
642
643             /* make sure we do not build diffs with old last_ds values */
644             if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
645                 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
646                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
647             }
648
649             /* NOTE: DST_CDEF should never enter this if block, because
650              * updvals[i+1][0] is initialized to 'U'; unless the caller
651              * accidently specified a value for the DST_CDEF. To handle 
652              * this case, an extra check is required. */
653
654             if ((updvals[i + 1][0] != 'U') &&
655                 (dst_idx != DST_CDEF) &&
656                 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
657                 double    rate = DNAN;
658
659                 /* the data source type defines how to process the data */
660                 /* pdp_new contains rate * time ... eg the bytes
661                  * transferred during the interval. Doing it this way saves
662                  * a lot of math operations */
663
664
665                 switch (dst_idx) {
666                 case DST_COUNTER:
667                 case DST_DERIVE:
668                     if (rrd.pdp_prep[i].last_ds[0] != 'U') {
669                         for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
670                             if ((updvals[i + 1][ii] < '0'
671                                  || updvals[i + 1][ii] > '9') && (ii != 0
672                                                                   && updvals[i
673                                                                              +
674                                                                              1]
675                                                                   [ii] !=
676                                                                   '-')) {
677                                 rrd_set_error("not a simple integer: '%s'",
678                                               updvals[i + 1]);
679                                 break;
680                             }
681                         }
682                         if (rrd_test_error()) {
683                             break;
684                         }
685                         pdp_new[i] =
686                             rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
687                         if (dst_idx == DST_COUNTER) {
688                             /* simple overflow catcher suggested by Andres Kroonmaa */
689                             /* this will fail terribly for non 32 or 64 bit counters ... */
690                             /* are there any others in SNMP land ? */
691                             if (pdp_new[i] < (double) 0.0)
692                                 pdp_new[i] += (double) 4294967296.0;    /* 2^32 */
693                             if (pdp_new[i] < (double) 0.0)
694                                 pdp_new[i] += (double) 18446744069414584320.0;
695                             /* 2^64-2^32 */ ;
696                         }
697                         rate = pdp_new[i] / interval;
698                     } else {
699                         pdp_new[i] = DNAN;
700                     }
701                     break;
702                 case DST_ABSOLUTE:
703                     errno = 0;
704                     pdp_new[i] = strtod(updvals[i + 1], &endptr);
705                     if (errno > 0) {
706                         rrd_set_error("converting  '%s' to float: %s",
707                                       updvals[i + 1], rrd_strerror(errno));
708                         break;
709                     };
710                     if (endptr[0] != '\0') {
711                         rrd_set_error
712                             ("conversion of '%s' to float not complete: tail '%s'",
713                              updvals[i + 1], endptr);
714                         break;
715                     }
716                     rate = pdp_new[i] / interval;
717                     break;
718                 case DST_GAUGE:
719                     errno = 0;
720                     pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
721                     if (errno > 0) {
722                         rrd_set_error("converting  '%s' to float: %s",
723                                       updvals[i + 1], rrd_strerror(errno));
724                         break;
725                     };
726                     if (endptr[0] != '\0') {
727                         rrd_set_error
728                             ("conversion of '%s' to float not complete: tail '%s'",
729                              updvals[i + 1], endptr);
730                         break;
731                     }
732                     rate = pdp_new[i] / interval;
733                     break;
734                 default:
735                     rrd_set_error("rrd contains unknown DS type : '%s'",
736                                   rrd.ds_def[i].dst);
737                     break;
738                 }
739                 /* break out of this for loop if the error string is set */
740                 if (rrd_test_error()) {
741                     break;
742                 }
743                 /* make sure pdp_temp is neither too large or too small
744                  * if any of these occur it becomes unknown ...
745                  * sorry folks ... */
746                 if (!isnan(rate) &&
747                     ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
748                       rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
749                      (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
750                       rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
751                     pdp_new[i] = DNAN;
752                 }
753             } else {
754                 /* no news is news all the same */
755                 pdp_new[i] = DNAN;
756             }
757
758
759             /* make a copy of the command line argument for the next run */
760 #ifdef DEBUG
761             fprintf(stderr,
762                     "prep ds[%lu]\t"
763                     "last_arg '%s'\t"
764                     "this_arg '%s'\t"
765                     "pdp_new %10.2f\n",
766                     i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
767 #endif
768             strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
769             rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
770         }
771         /* break out of the argument parsing loop if the error_string is set */
772         if (rrd_test_error()) {
773             free(step_start);
774             break;
775         }
776         /* has a pdp_st moment occurred since the last run ? */
777
778         if (proc_pdp_st == occu_pdp_st) {
779             /* no we have not passed a pdp_st moment. therefore update is simple */
780
781             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
782                 if (isnan(pdp_new[i])) {
783                     /* this is not realy accurate if we use subsecond data arival time
784                        should have thought of it when going subsecond resolution ...
785                        sorry next format change we will have it! */
786                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
787                         floor(interval);
788                 } else {
789                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
790                         rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
791                     } else {
792                         rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
793                     }
794                 }
795 #ifdef DEBUG
796                 fprintf(stderr,
797                         "NO PDP  ds[%lu]\t"
798                         "value %10.2f\t"
799                         "unkn_sec %5lu\n",
800                         i,
801                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
802                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
803 #endif
804             }
805         } else {
806             /* an pdp_st has occurred. */
807
808             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
809              * occurred up to the last run.        
810              pdp_new[] contains rate*seconds from the latest run.
811              pdp_temp[] will contain the rate for cdp */
812
813             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
814                 /* update pdp_prep to the current pdp_st. */
815                 double    pre_unknown = 0.0;
816
817                 if (isnan(pdp_new[i]))
818                     /* a final bit of unkonwn to be added bevore calculation
819                      * we use a tempaorary variable for this so that we 
820                      * don't have to turn integer lines before using the value */
821                     pre_unknown = pre_int;
822                 else {
823                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
824                         rrd.pdp_prep[i].scratch[PDP_val].u_val =
825                             pdp_new[i] / interval * pre_int;
826                     } else {
827                         rrd.pdp_prep[i].scratch[PDP_val].u_val +=
828                             pdp_new[i] / interval * pre_int;
829                     }
830                 }
831
832
833                 /* if too much of the pdp_prep is unknown we dump it */
834                 if (
835                        /* removed because this does not agree with the definition
836                           a heart beat can be unknown */
837                        /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
838                           > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
839                        /* if the interval is larger thatn mrhb we get NAN */
840                        (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
841                        (occu_pdp_st - proc_pdp_st <=
842                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
843                     pdp_temp[i] = DNAN;
844                 } else {
845                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
846                         / ((double) (occu_pdp_st - proc_pdp_st
847                                      -
848                                      rrd.pdp_prep[i].
849                                      scratch[PDP_unkn_sec_cnt].u_cnt)
850                            - pre_unknown);
851                 }
852
853                 /* process CDEF data sources; remember each CDEF DS can
854                  * only reference other DS with a lower index number */
855                 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
856                     rpnp_t   *rpnp;
857
858                     rpnp =
859                         rpn_expand((rpn_cdefds_t *) &
860                                    (rrd.ds_def[i].par[DS_cdef]));
861                     /* substitue data values for OP_VARIABLE nodes */
862                     for (ii = 0; rpnp[ii].op != OP_END; ii++) {
863                         if (rpnp[ii].op == OP_VARIABLE) {
864                             rpnp[ii].op = OP_NUMBER;
865                             rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
866                         }
867                     }
868                     /* run the rpn calculator */
869                     if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
870                         free(rpnp);
871                         break;  /* exits the data sources pdp_temp loop */
872                     }
873                 }
874
875                 /* make pdp_prep ready for the next run */
876                 if (isnan(pdp_new[i])) {
877                     /* this is not realy accurate if we use subsecond data arival time
878                        should have thought of it when going subsecond resolution ...
879                        sorry next format change we will have it! */
880                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
881                         floor(post_int);
882                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
883                 } else {
884                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
885                     rrd.pdp_prep[i].scratch[PDP_val].u_val =
886                         pdp_new[i] / interval * post_int;
887                 }
888
889 #ifdef DEBUG
890                 fprintf(stderr,
891                         "PDP UPD ds[%lu]\t"
892                         "pdp_temp %10.2f\t"
893                         "new_prep %10.2f\t"
894                         "new_unkn_sec %5lu\n",
895                         i, pdp_temp[i],
896                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
897                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
898 #endif
899             }
900
901             /* if there were errors during the last loop, bail out here */
902             if (rrd_test_error()) {
903                 free(step_start);
904                 break;
905             }
906
907             /* compute the number of elapsed pdp_st moments */
908             elapsed_pdp_st =
909                 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
910 #ifdef DEBUG
911             fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
912 #endif
913             if (rra_step_cnt == NULL) {
914                 rra_step_cnt = (unsigned long *)
915                     malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
916             }
917
918             for (i = 0, rra_start = rra_begin;
919                  i < rrd.stat_head->rra_cnt;
920                  rra_start +=
921                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
922                  sizeof(rrd_value_t), i++) {
923                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
924                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
925                     (proc_pdp_st / rrd.stat_head->pdp_step) %
926                     rrd.rra_def[i].pdp_cnt;
927                 if (start_pdp_offset <= elapsed_pdp_st) {
928                     rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
929                         rrd.rra_def[i].pdp_cnt + 1;
930                 } else {
931                     rra_step_cnt[i] = 0;
932                 }
933
934                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
935                     /* If this is a bulk update, we need to skip ahead in the seasonal
936                      * arrays so that they will be correct for the next observed value;
937                      * note that for the bulk update itself, no update will occur to
938                      * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
939                      * be set to DNAN. */
940                     if (rra_step_cnt[i] > 2) {
941                         /* skip update by resetting rra_step_cnt[i],
942                          * note that this is not data source specific; this is due
943                          * to the bulk update, not a DNAN value for the specific data
944                          * source. */
945                         rra_step_cnt[i] = 0;
946                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
947                                         elapsed_pdp_st, &last_seasonal_coef);
948                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
949                                         elapsed_pdp_st + 1, &seasonal_coef);
950                     }
951
952                     /* periodically run a smoother for seasonal effects */
953                     /* Need to use first cdp parameter buffer to track
954                      * burnin (burnin requires a specific smoothing schedule).
955                      * The CDP_init_seasonal parameter is really an RRA level,
956                      * not a data source within RRA level parameter, but the rra_def
957                      * is read only for rrd_update (not flushed to disk). */
958                     iii = i * (rrd.stat_head->ds_cnt);
959                     if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
960                         <= BURNIN_CYCLES) {
961                         if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
962                             > rrd.rra_def[i].row_cnt - 1) {
963                             /* mark off one of the burnin cycles */
964                             ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
965                                u_cnt);
966                             schedule_smooth = 1;
967                         }
968                     } else {
969                         /* someone has no doubt invented a trick to deal with this
970                          * wrap around, but at least this code is clear. */
971                         if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
972                             u_cnt > rrd.rra_ptr[i].cur_row) {
973                             /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
974                              * mapping between PDP and CDP */
975                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
976                                 >=
977                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
978                                 u_cnt) {
979 #ifdef DEBUG
980                                 fprintf(stderr,
981                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
982                                         rrd.rra_ptr[i].cur_row,
983                                         elapsed_pdp_st,
984                                         rrd.rra_def[i].
985                                         par[RRA_seasonal_smooth_idx].u_cnt);
986 #endif
987                                 schedule_smooth = 1;
988                             }
989                         } else {
990                             /* can't rely on negative numbers because we are working with
991                              * unsigned values */
992                             /* Don't need modulus here. If we've wrapped more than once, only
993                              * one smooth is executed at the end. */
994                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
995                                 rrd.rra_def[i].row_cnt
996                                 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
997                                 rrd.rra_def[i].row_cnt >=
998                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
999                                 u_cnt) {
1000 #ifdef DEBUG
1001                                 fprintf(stderr,
1002                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1003                                         rrd.rra_ptr[i].cur_row,
1004                                         elapsed_pdp_st,
1005                                         rrd.rra_def[i].
1006                                         par[RRA_seasonal_smooth_idx].u_cnt);
1007 #endif
1008                                 schedule_smooth = 1;
1009                             }
1010                         }
1011                     }
1012
1013                     rra_current = rrd_tell(rrd_file);
1014                 }
1015                 /* if cf is DEVSEASONAL or SEASONAL */
1016                 if (rrd_test_error())
1017                     break;
1018
1019                 /* update CDP_PREP areas */
1020                 /* loop over data soures within each RRA */
1021                 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1022
1023                     /* iii indexes the CDP prep area for this data source within the RRA */
1024                     iii = i * rrd.stat_head->ds_cnt + ii;
1025
1026                     if (rrd.rra_def[i].pdp_cnt > 1) {
1027
1028                         if (rra_step_cnt[i] > 0) {
1029                             /* If we are in this block, as least 1 CDP value will be written to
1030                              * disk, this is the CDP_primary_val entry. If more than 1 value needs
1031                              * to be written, then the "fill in" value is the CDP_secondary_val
1032                              * entry. */
1033                             if (isnan(pdp_temp[ii])) {
1034                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1035                                     u_cnt += start_pdp_offset;
1036                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1037                                     u_val = DNAN;
1038                             } else {
1039                                 /* CDP_secondary value is the RRA "fill in" value for intermediary
1040                                  * CDP data entries. No matter the CF, the value is the same because
1041                                  * the average, max, min, and last of a list of identical values is
1042                                  * the same, namely, the value itself. */
1043                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1044                                     u_val = pdp_temp[ii];
1045                             }
1046
1047                             if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1048                                 u_cnt >
1049                                 rrd.rra_def[i].pdp_cnt *
1050                                 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
1051                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1052                                     u_val = DNAN;
1053                                 /* initialize carry over */
1054                                 if (current_cf == CF_AVERAGE) {
1055                                     if (isnan(pdp_temp[ii])) {
1056                                         rrd.cdp_prep[iii].scratch[CDP_val].
1057                                             u_val = DNAN;
1058                                     } else {
1059                                         rrd.cdp_prep[iii].scratch[CDP_val].
1060                                             u_val =
1061                                             pdp_temp[ii] *
1062                                             ((elapsed_pdp_st -
1063                                               start_pdp_offset) %
1064                                              rrd.rra_def[i].pdp_cnt);
1065                                     }
1066                                 } else {
1067                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1068                                         pdp_temp[ii];
1069                                 }
1070                             } else {
1071                                 rrd_value_t cum_val, cur_val;
1072
1073                                 switch (current_cf) {
1074                                 case CF_AVERAGE:
1075                                     cum_val =
1076                                         IFDNAN(rrd.cdp_prep[iii].
1077                                                scratch[CDP_val].u_val, 0.0);
1078                                     cur_val = IFDNAN(pdp_temp[ii], 0.0);
1079                                     rrd.cdp_prep[iii].
1080                                         scratch[CDP_primary_val].u_val =
1081                                         (cum_val +
1082                                          cur_val * start_pdp_offset) /
1083                                         (rrd.rra_def[i].pdp_cnt -
1084                                          rrd.cdp_prep[iii].
1085                                          scratch[CDP_unkn_pdp_cnt].u_cnt);
1086                                     /* initialize carry over value */
1087                                     if (isnan(pdp_temp[ii])) {
1088                                         rrd.cdp_prep[iii].scratch[CDP_val].
1089                                             u_val = DNAN;
1090                                     } else {
1091                                         rrd.cdp_prep[iii].scratch[CDP_val].
1092                                             u_val =
1093                                             pdp_temp[ii] *
1094                                             ((elapsed_pdp_st -
1095                                               start_pdp_offset) %
1096                                              rrd.rra_def[i].pdp_cnt);
1097                                     }
1098                                     break;
1099                                 case CF_MAXIMUM:
1100                                     cum_val =
1101                                         IFDNAN(rrd.cdp_prep[iii].
1102                                                scratch[CDP_val].u_val, -DINF);
1103                                     cur_val = IFDNAN(pdp_temp[ii], -DINF);
1104 #ifdef DEBUG
1105                                     if (isnan
1106                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1107                                          u_val) && isnan(pdp_temp[ii])) {
1108                                         fprintf(stderr,
1109                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1110                                                 i, ii);
1111                                         exit(-1);
1112                                     }
1113 #endif
1114                                     if (cur_val > cum_val)
1115                                         rrd.cdp_prep[iii].
1116                                             scratch[CDP_primary_val].u_val =
1117                                             cur_val;
1118                                     else
1119                                         rrd.cdp_prep[iii].
1120                                             scratch[CDP_primary_val].u_val =
1121                                             cum_val;
1122                                     /* initialize carry over value */
1123                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1124                                         pdp_temp[ii];
1125                                     break;
1126                                 case CF_MINIMUM:
1127                                     cum_val =
1128                                         IFDNAN(rrd.cdp_prep[iii].
1129                                                scratch[CDP_val].u_val, DINF);
1130                                     cur_val = IFDNAN(pdp_temp[ii], DINF);
1131 #ifdef DEBUG
1132                                     if (isnan
1133                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1134                                          u_val) && isnan(pdp_temp[ii])) {
1135                                         fprintf(stderr,
1136                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1137                                                 i, ii);
1138                                         exit(-1);
1139                                     }
1140 #endif
1141                                     if (cur_val < cum_val)
1142                                         rrd.cdp_prep[iii].
1143                                             scratch[CDP_primary_val].u_val =
1144                                             cur_val;
1145                                     else
1146                                         rrd.cdp_prep[iii].
1147                                             scratch[CDP_primary_val].u_val =
1148                                             cum_val;
1149                                     /* initialize carry over value */
1150                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1151                                         pdp_temp[ii];
1152                                     break;
1153                                 case CF_LAST:
1154                                 default:
1155                                     rrd.cdp_prep[iii].
1156                                         scratch[CDP_primary_val].u_val =
1157                                         pdp_temp[ii];
1158                                     /* initialize carry over value */
1159                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1160                                         pdp_temp[ii];
1161                                     break;
1162                                 }
1163                             }   /* endif meets xff value requirement for a valid value */
1164                             /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1165                              * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1166                             if (isnan(pdp_temp[ii]))
1167                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1168                                     u_cnt =
1169                                     (elapsed_pdp_st -
1170                                      start_pdp_offset) %
1171                                     rrd.rra_def[i].pdp_cnt;
1172                             else
1173                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1174                                     u_cnt = 0;
1175                         } else {    /* rra_step_cnt[i]  == 0 */
1176
1177 #ifdef DEBUG
1178                             if (isnan
1179                                 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1180                                 fprintf(stderr,
1181                                         "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1182                                         i, ii);
1183                             } else {
1184                                 fprintf(stderr,
1185                                         "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1186                                         i, ii,
1187                                         rrd.cdp_prep[iii].scratch[CDP_val].
1188                                         u_val);
1189                             }
1190 #endif
1191                             if (isnan(pdp_temp[ii])) {
1192                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1193                                     u_cnt += elapsed_pdp_st;
1194                             } else
1195                                 if (isnan
1196                                     (rrd.cdp_prep[iii].scratch[CDP_val].
1197                                      u_val)) {
1198                                 if (current_cf == CF_AVERAGE) {
1199                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1200                                         pdp_temp[ii] * elapsed_pdp_st;
1201                                 } else {
1202                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1203                                         pdp_temp[ii];
1204                                 }
1205 #ifdef DEBUG
1206                                 fprintf(stderr,
1207                                         "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1208                                         i, ii,
1209                                         rrd.cdp_prep[iii].scratch[CDP_val].
1210                                         u_val);
1211 #endif
1212                             } else {
1213                                 switch (current_cf) {
1214                                 case CF_AVERAGE:
1215                                     rrd.cdp_prep[iii].scratch[CDP_val].
1216                                         u_val +=
1217                                         pdp_temp[ii] * elapsed_pdp_st;
1218                                     break;
1219                                 case CF_MINIMUM:
1220                                     if (pdp_temp[ii] <
1221                                         rrd.cdp_prep[iii].scratch[CDP_val].
1222                                         u_val)
1223                                         rrd.cdp_prep[iii].scratch[CDP_val].
1224                                             u_val = pdp_temp[ii];
1225                                     break;
1226                                 case CF_MAXIMUM:
1227                                     if (pdp_temp[ii] >
1228                                         rrd.cdp_prep[iii].scratch[CDP_val].
1229                                         u_val)
1230                                         rrd.cdp_prep[iii].scratch[CDP_val].
1231                                             u_val = pdp_temp[ii];
1232                                     break;
1233                                 case CF_LAST:
1234                                 default:
1235                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1236                                         pdp_temp[ii];
1237                                     break;
1238                                 }
1239                             }
1240                         }
1241                     } else {    /* rrd.rra_def[i].pdp_cnt == 1 */
1242                         if (elapsed_pdp_st > 2) {
1243                             switch (current_cf) {
1244                             case CF_AVERAGE:
1245                             default:
1246                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1247                                     u_val = pdp_temp[ii];
1248                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1249                                     u_val = pdp_temp[ii];
1250                                 break;
1251                             case CF_SEASONAL:
1252                             case CF_DEVSEASONAL:
1253                                 /* need to update cached seasonal values, so they are consistent
1254                                  * with the bulk update */
1255                                 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1256                                  * CDP_last_deviation are the same. */
1257                                 rrd.cdp_prep[iii].
1258                                     scratch[CDP_hw_last_seasonal].u_val =
1259                                     last_seasonal_coef[ii];
1260                                 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1261                                     u_val = seasonal_coef[ii];
1262                                 break;
1263                             case CF_HWPREDICT:
1264                                 /* need to update the null_count and last_null_count.
1265                                  * even do this for non-DNAN pdp_temp because the
1266                                  * algorithm is not learning from batch updates. */
1267                                 rrd.cdp_prep[iii].scratch[CDP_null_count].
1268                                     u_cnt += elapsed_pdp_st;
1269                                 rrd.cdp_prep[iii].
1270                                     scratch[CDP_last_null_count].u_cnt +=
1271                                     elapsed_pdp_st - 1;
1272                                 /* fall through */
1273                             case CF_DEVPREDICT:
1274                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1275                                     u_val = DNAN;
1276                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1277                                     u_val = DNAN;
1278                                 break;
1279                             case CF_FAILURES:
1280                                 /* do not count missed bulk values as failures */
1281                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1282                                     u_val = 0;
1283                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1284                                     u_val = 0;
1285                                 /* need to reset violations buffer.
1286                                  * could do this more carefully, but for now, just
1287                                  * assume a bulk update wipes away all violations. */
1288                                 erase_violations(&rrd, iii, i);
1289                                 break;
1290                             }
1291                         }
1292                     }   /* endif rrd.rra_def[i].pdp_cnt == 1 */
1293
1294                     if (rrd_test_error())
1295                         break;
1296
1297                 }       /* endif data sources loop */
1298             }           /* end RRA Loop */
1299
1300             /* this loop is only entered if elapsed_pdp_st < 3 */
1301             for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1302                  j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1303                 for (i = 0, rra_start = rra_begin;
1304                      i < rrd.stat_head->rra_cnt;
1305                      rra_start +=
1306                      rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1307                      sizeof(rrd_value_t), i++) {
1308                     if (rrd.rra_def[i].pdp_cnt > 1)
1309                         continue;
1310
1311                     current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1312                     if (current_cf == CF_SEASONAL
1313                         || current_cf == CF_DEVSEASONAL) {
1314                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
1315                                         elapsed_pdp_st + (scratch_idx ==
1316                                                           CDP_primary_val ? 1
1317                                                           : 2),
1318                                         &seasonal_coef);
1319                         rra_current = rrd_tell(rrd_file);
1320                     }
1321                     if (rrd_test_error())
1322                         break;
1323                     /* loop over data soures within each RRA */
1324                     for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1325                         update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1326                                            i * (rrd.stat_head->ds_cnt) + ii,
1327                                            i, ii, scratch_idx, seasonal_coef);
1328                     }
1329                 }       /* end RRA Loop */
1330                 if (rrd_test_error())
1331                     break;
1332             }           /* end elapsed_pdp_st loop */
1333
1334             if (rrd_test_error())
1335                 break;
1336
1337             /* Ready to write to disk */
1338             /* Move sequentially through the file, writing one RRA at a time.
1339              * Note this architecture divorces the computation of CDP with
1340              * flushing updated RRA entries to disk. */
1341             for (i = 0, rra_start = rra_begin;
1342                  i < rrd.stat_head->rra_cnt;
1343                  rra_start +=
1344                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1345                  sizeof(rrd_value_t), i++) {
1346                 /* is th5Aere anything to write for this RRA? If not, continue. */
1347                 if (rra_step_cnt[i] == 0)
1348                     continue;
1349
1350                 /* write the first row */
1351 #ifdef DEBUG
1352                 fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1353 #endif
1354                 rrd.rra_ptr[i].cur_row++;
1355                 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1356                     rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1357                 /* positition on the first row */
1358                 rra_pos_tmp = rra_start +
1359                     (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1360                     sizeof(rrd_value_t);
1361                 if (rra_pos_tmp != rra_current) {
1362 #ifndef HAVE_MMAP
1363                     if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1364                         rrd_set_error("seek error in rrd");
1365                         break;
1366                     }
1367 #endif
1368                     rra_current = rra_pos_tmp;
1369                 }
1370 #ifdef DEBUG
1371                 fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1372 #endif
1373                 scratch_idx = CDP_primary_val;
1374                 if (pcdp_summary != NULL) {
1375                     rra_time = (current_time - current_time
1376                                 % (rrd.rra_def[i].pdp_cnt *
1377                                    rrd.stat_head->pdp_step))
1378                         -
1379                         ((rra_step_cnt[i] -
1380                           1) * rrd.rra_def[i].pdp_cnt *
1381                          rrd.stat_head->pdp_step);
1382                 }
1383 #ifdef HAVE_MMAP
1384                 pcdp_summary =
1385                     write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1386                                   rrd_file->fd, pcdp_summary, &rra_time,
1387                                   rrd_file->file_start);
1388 #else
1389                 pcdp_summary =
1390                     write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1391                                   rrd_file->fd, pcdp_summary, &rra_time);
1392 #endif
1393                 if (rrd_test_error())
1394                     break;
1395
1396                 /* write other rows of the bulk update, if any */
1397                 scratch_idx = CDP_secondary_val;
1398                 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1399                     if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1400 #ifdef DEBUG
1401                         fprintf(stderr,
1402                                 "Wraparound for RRA %s, %lu updates left\n",
1403                                 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1404 #endif
1405                         /* wrap */
1406                         rrd.rra_ptr[i].cur_row = 0;
1407                         /* seek back to beginning of current rra */
1408                         if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1409                             rrd_set_error("seek error in rrd");
1410                             break;
1411                         }
1412 #ifdef DEBUG
1413                         fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1414                                 rrd_file->pos);
1415 #endif
1416                         rra_current = rra_start;
1417                     }
1418                     if (pcdp_summary != NULL) {
1419                         rra_time = (current_time - current_time
1420                                     % (rrd.rra_def[i].pdp_cnt *
1421                                        rrd.stat_head->pdp_step))
1422                             -
1423                             ((rra_step_cnt[i] -
1424                               2) * rrd.rra_def[i].pdp_cnt *
1425                              rrd.stat_head->pdp_step);
1426                     }
1427 #ifdef HAVE_MMAP
1428                     pcdp_summary =
1429                         write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1430                                       rrd_file->fd, pcdp_summary, &rra_time,
1431                                       rrd_file->file_start);
1432 #else
1433                     pcdp_summary =
1434                         write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1435                                       rrd_file->fd, pcdp_summary, &rra_time);
1436 #endif
1437                 }
1438
1439                 if (rrd_test_error())
1440                     break;
1441             }           /* RRA LOOP */
1442
1443             /* break out of the argument parsing loop if error_string is set */
1444             if (rrd_test_error()) {
1445                 free(step_start);
1446                 break;
1447             }
1448
1449         }               /* endif a pdp_st has occurred */
1450         rrd.live_head->last_up = current_time;
1451         rrd.live_head->last_up_usec = current_time_usec;
1452         free(step_start);
1453     }                   /* function argument loop */
1454
1455     if (seasonal_coef != NULL)
1456         free(seasonal_coef);
1457     if (last_seasonal_coef != NULL)
1458         free(last_seasonal_coef);
1459     if (rra_step_cnt != NULL)
1460         free(rra_step_cnt);
1461     rpnstack_free(&rpnstack);
1462
1463 #ifdef HAVE_MMAP
1464     if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1465         rrd_set_error("error writing(unmapping) file: %s", filename);
1466     }
1467 #endif
1468     /* if we got here and if there is an error and if the file has not been
1469      * written to, then close things up and return. */
1470     if (rrd_test_error()) {
1471         free(updvals);
1472         free(tmpl_idx);
1473         rrd_free(&rrd);
1474         free(pdp_temp);
1475         free(pdp_new);
1476         close(rrd_file->fd);
1477         return (-1);
1478     }
1479
1480     /* aargh ... that was tough ... so many loops ... anyway, its done.
1481      * we just need to write back the live header portion now*/
1482
1483     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1484                             + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1485                             + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1486                  SEEK_SET) != 0) {
1487         rrd_set_error("seek rrd for live header writeback");
1488         free(updvals);
1489         free(tmpl_idx);
1490         rrd_free(&rrd);
1491         free(pdp_temp);
1492         free(pdp_new);
1493         close(rrd_file->fd);
1494         return (-1);
1495     }
1496
1497     if (version >= 3) {
1498         if (rrd_write(rrd_file, rrd.live_head,
1499                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1500             rrd_set_error("rrd_write live_head to rrd");
1501             free(updvals);
1502             rrd_free(&rrd);
1503             free(tmpl_idx);
1504             free(pdp_temp);
1505             free(pdp_new);
1506             close(rrd_file->fd);
1507             return (-1);
1508         }
1509     } else {
1510         if (rrd_write(rrd_file, &rrd.live_head->last_up,
1511                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1512             rrd_set_error("rrd_write live_head to rrd");
1513             free(updvals);
1514             rrd_free(&rrd);
1515             free(tmpl_idx);
1516             free(pdp_temp);
1517             free(pdp_new);
1518             close(rrd_file->fd);
1519             return (-1);
1520         }
1521     }
1522
1523
1524     if (rrd_write(rrd_file, rrd.pdp_prep,
1525                   sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1526         != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1527         rrd_set_error("rrd_write pdp_prep to rrd");
1528         free(updvals);
1529         rrd_free(&rrd);
1530         free(tmpl_idx);
1531         free(pdp_temp);
1532         free(pdp_new);
1533         close(rrd_file->fd);
1534         return (-1);
1535     }
1536
1537     if (rrd_write(rrd_file, rrd.cdp_prep,
1538                   sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1539                   rrd.stat_head->ds_cnt)
1540         != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1541                       rrd.stat_head->ds_cnt)) {
1542
1543         rrd_set_error("rrd_write cdp_prep to rrd");
1544         free(updvals);
1545         free(tmpl_idx);
1546         rrd_free(&rrd);
1547         free(pdp_temp);
1548         free(pdp_new);
1549         close(rrd_file->fd);
1550         return (-1);
1551     }
1552
1553     if (rrd_write(rrd_file, rrd.rra_ptr,
1554                   sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1555         != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1556         rrd_set_error("rrd_write rra_ptr to rrd");
1557         free(updvals);
1558         free(tmpl_idx);
1559         rrd_free(&rrd);
1560         free(pdp_temp);
1561         free(pdp_new);
1562         close(rrd_file->fd);
1563         return (-1);
1564     }
1565 #ifdef HAVE_POSIX_FADVISExxx
1566
1567     /* with update we have write ops, so they will probably not be done by now, this means
1568        the buffers will not get freed. But calling this for the whole file - header
1569        will let the data off the hook as soon as it is written when if it is from a previous
1570        update cycle. Calling fdsync to force things is much too hard here. */
1571
1572     if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1573         rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1574                       rrd_strerror(errno));
1575         close(rrd_file->fd);
1576         return (-1);
1577     }
1578 #endif
1579     /*XXX: ? */ rrd_flush(rrd_file);
1580
1581     /* calling the smoothing code here guarantees at most
1582      * one smoothing operation per rrd_update call. Unfortunately,
1583      * it is possible with bulk updates, or a long-delayed update
1584      * for smoothing to occur off-schedule. This really isn't
1585      * critical except during the burning cycles. */
1586     if (schedule_smooth) {
1587 //    in_file = fopen(filename,"rb+");
1588
1589
1590         rra_start = rra_begin;
1591         for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1592             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1593                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1594 #ifdef DEBUG
1595                 fprintf(stderr, "Running smoother for rra %ld\n", i);
1596 #endif
1597                 apply_smoother(&rrd, i, rra_start, rrd_file);
1598                 if (rrd_test_error())
1599                     break;
1600             }
1601             rra_start += rrd.rra_def[i].row_cnt
1602                 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1603         }
1604 #ifdef HAVE_POSIX_FADVISExxx
1605         /* same procedure as above ... */
1606         if (0 !=
1607             posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1608             rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1609                           rrd_strerror(errno));
1610             close(rrd_file->fd);
1611             return (-1);
1612         }
1613 #endif
1614         close(rrd_file->fd);
1615     }
1616
1617     /* OK now close the files and free the memory */
1618     if (close(rrd_file->fd) != 0) {
1619         rrd_set_error("closing rrd");
1620         free(updvals);
1621         free(tmpl_idx);
1622         rrd_free(&rrd);
1623         free(pdp_temp);
1624         free(pdp_new);
1625         return (-1);
1626     }
1627
1628     rrd_free(&rrd);
1629     free(updvals);
1630     free(tmpl_idx);
1631     free(pdp_new);
1632     free(pdp_temp);
1633     return (0);
1634 }
1635
1636 /*
1637  * get exclusive lock to whole file.
1638  * lock gets removed when we close the file
1639  *
1640  * returns 0 on success
1641  */
1642 int LockRRD(
1643     int in_file)
1644 {
1645     int       rcstat;
1646
1647     {
1648 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1649         struct _stat st;
1650
1651         if (_fstat(in_file, &st) == 0) {
1652             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1653         } else {
1654             rcstat = -1;
1655         }
1656 #else
1657         struct flock lock;
1658
1659         lock.l_type = F_WRLCK;  /* exclusive write lock */
1660         lock.l_len = 0; /* whole file */
1661         lock.l_start = 0;   /* start of file */
1662         lock.l_whence = SEEK_SET;   /* end of file */
1663
1664         rcstat = fcntl(in_file, F_SETLK, &lock);
1665 #endif
1666     }
1667
1668     return (rcstat);
1669 }
1670
1671
1672 #ifdef HAVE_MMAP
1673 info_t
1674
1675
1676
1677
1678
1679
1680
1681
1682          *write_RRA_row(
1683     rrd_t *rrd,
1684     unsigned long rra_idx,
1685     unsigned long *rra_current,
1686     unsigned short CDP_scratch_idx,
1687 #ifndef DEBUG
1688     int UNUSED(in_file),
1689 #else
1690     int in_file,
1691 #endif
1692     info_t *pcdp_summary,
1693     time_t *rra_time,
1694     void *rrd_mmaped_file)
1695 #else
1696 info_t
1697
1698
1699
1700
1701
1702
1703
1704
1705          *write_RRA_row(
1706     rrd_t *rrd,
1707     unsigned long rra_idx,
1708     unsigned long *rra_current,
1709     unsigned short CDP_scratch_idx,
1710     int in_file,
1711     info_t *pcdp_summary,
1712     time_t *rra_time)
1713 #endif
1714 {
1715     unsigned long ds_idx, cdp_idx;
1716     infoval   iv;
1717
1718     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1719         /* compute the cdp index */
1720         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1721 #ifdef DEBUG
1722         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1723                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1724                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1725 #endif
1726         if (pcdp_summary != NULL) {
1727             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1728             /* append info to the return hash */
1729             pcdp_summary = info_push(pcdp_summary,
1730                                      sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1731                                                    *rra_time,
1732                                                    rrd->rra_def[rra_idx].
1733                                                    cf_nam,
1734                                                    rrd->rra_def[rra_idx].
1735                                                    pdp_cnt,
1736                                                    rrd->ds_def[ds_idx].
1737                                                    ds_nam), RD_I_VAL, iv);
1738         }
1739 #ifdef HAVE_MMAP
1740         memcpy((char *) rrd_mmaped_file + *rra_current,
1741                &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1742                sizeof(rrd_value_t));
1743 #else
1744         if (rrd_write
1745             (rrd_file,
1746              &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1747              sizeof(rrd_value_t) * 1) != sizeof(rrd_value_t) * 1) {
1748             rrd_set_error("writing rrd");
1749             return 0;
1750         }
1751 #endif
1752         *rra_current += sizeof(rrd_value_t);
1753     }
1754     return (pcdp_summary);
1755 }