reindented everything according to .indent.pro
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 # include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37     time_t    tv_sec;   /* seconds */
38     long      tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43     int       tz_minuteswest;   /* minutes W of Greenwich */
44     int       tz_dsttime;   /* type of dst correction */
45 };
46
47 static int gettimeofday(
48     struct timeval *t,
49     struct __timezone *tz)
50 {
51
52     struct _timeb current_time;
53
54     _ftime(&current_time);
55
56     t->tv_sec = current_time.time;
57     t->tv_usec = current_time.millitm * 1000;
58
59     return 0;
60 }
61
62 #endif
63 /*
64  * normilize time as returned by gettimeofday. usec part must
65  * be always >= 0
66  */
67 static void normalize_time(
68     struct timeval *t)
69 {
70     if (t->tv_usec < 0) {
71         t->tv_sec--;
72         t->tv_usec += 1000000L;
73     }
74 }
75
76 /* Local prototypes */
77 int       LockRRD(
78     int in_file);
79
80 #ifdef HAVE_MMAP
81 info_t   *write_RRA_row(
82     rrd_t *rrd,
83     unsigned long rra_idx,
84     unsigned long *rra_current,
85     unsigned short CDP_scratch_idx,
86 #ifndef DEBUG
87     int UNUSED(in_file),
88 #else
89     int in_file,
90 #endif
91     info_t *pcdp_summary,
92     time_t *rra_time,
93     void *rrd_mmaped_file);
94 #else
95 info_t   *write_RRA_row(
96     rrd_t *rrd,
97     unsigned long rra_idx,
98     unsigned long *rra_current,
99     unsigned short CDP_scratch_idx,
100     int in_file,
101     info_t *pcdp_summary,
102     time_t *rra_time);
103 #endif
104 int       rrd_update_r(
105     const char *filename,
106     const char *tmplt,
107     int argc,
108     const char **argv);
109 int       _rrd_update(
110     const char *filename,
111     const char *tmplt,
112     int argc,
113     const char **argv,
114     info_t *);
115
116 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
117
118
119 info_t   *rrd_update_v(
120     int argc,
121     char **argv)
122 {
123     char     *tmplt = NULL;
124     info_t   *result = NULL;
125     infoval   rc;
126
127     rc.u_int = -1;
128     optind = 0;
129     opterr = 0;         /* initialize getopt */
130
131     while (1) {
132         static struct option long_options[] = {
133             {"template", required_argument, 0, 't'},
134             {0, 0, 0, 0}
135         };
136         int       option_index = 0;
137         int       opt;
138
139         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
140
141         if (opt == EOF)
142             break;
143
144         switch (opt) {
145         case 't':
146             tmplt = optarg;
147             break;
148
149         case '?':
150             rrd_set_error("unknown option '%s'", argv[optind - 1]);
151             goto end_tag;
152         }
153     }
154
155     /* need at least 2 arguments: filename, data. */
156     if (argc - optind < 2) {
157         rrd_set_error("Not enough arguments");
158         goto end_tag;
159     }
160     rc.u_int = 0;
161     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
162     rc.u_int = _rrd_update(argv[optind], tmplt,
163                            argc - optind - 1,
164                            (const char **) (argv + optind + 1), result);
165     result->value.u_int = rc.u_int;
166   end_tag:
167     return result;
168 }
169
170 int rrd_update(
171     int argc,
172     char **argv)
173 {
174     char     *tmplt = NULL;
175     int       rc;
176
177     optind = 0;
178     opterr = 0;         /* initialize getopt */
179
180     while (1) {
181         static struct option long_options[] = {
182             {"template", required_argument, 0, 't'},
183             {0, 0, 0, 0}
184         };
185         int       option_index = 0;
186         int       opt;
187
188         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
189
190         if (opt == EOF)
191             break;
192
193         switch (opt) {
194         case 't':
195             tmplt = optarg;
196             break;
197
198         case '?':
199             rrd_set_error("unknown option '%s'", argv[optind - 1]);
200             return (-1);
201         }
202     }
203
204     /* need at least 2 arguments: filename, data. */
205     if (argc - optind < 2) {
206         rrd_set_error("Not enough arguments");
207
208         return -1;
209     }
210
211     rc = rrd_update_r(argv[optind], tmplt,
212                       argc - optind - 1, (const char **) (argv + optind + 1));
213     return rc;
214 }
215
216 int rrd_update_r(
217     const char *filename,
218     const char *tmplt,
219     int argc,
220     const char **argv)
221 {
222     return _rrd_update(filename, tmplt, argc, argv, NULL);
223 }
224
225 int _rrd_update(
226     const char *filename,
227     const char *tmplt,
228     int argc,
229     const char **argv,
230     info_t *pcdp_summary)
231 {
232
233     int       arg_i = 2;
234     short     j;
235     unsigned long i, ii, iii = 1;
236
237     unsigned long rra_begin;    /* byte pointer to the rra
238                                  * area in the rrd file.  this
239                                  * pointer never changes value */
240     unsigned long rra_start;    /* byte pointer to the rra
241                                  * area in the rrd file.  this
242                                  * pointer changes as each rrd is
243                                  * processed. */
244     unsigned long rra_current;  /* byte pointer to the current write
245                                  * spot in the rrd file. */
246     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
247     double    interval, pre_int, post_int;  /* interval between this and
248                                              * the last run */
249     unsigned long proc_pdp_st;  /* which pdp_st was the last
250                                  * to be processed */
251     unsigned long occu_pdp_st;  /* when was the pdp_st
252                                  * before the last update
253                                  * time */
254     unsigned long proc_pdp_age; /* how old was the data in
255                                  * the pdp prep area when it
256                                  * was last updated */
257     unsigned long occu_pdp_age; /* how long ago was the last
258                                  * pdp_step time */
259     rrd_value_t *pdp_new;   /* prepare the incoming data
260                              * to be added the the
261                              * existing entry */
262     rrd_value_t *pdp_temp;  /* prepare the pdp values 
263                              * to be added the the
264                              * cdp values */
265
266     long     *tmpl_idx; /* index representing the settings
267                            transported by the tmplt index */
268     unsigned long tmpl_cnt = 2; /* time and data */
269
270     rrd_t     rrd;
271     time_t    current_time = 0;
272     time_t    rra_time = 0; /* time of update for a RRA */
273     unsigned long current_time_usec = 0;    /* microseconds part of current time */
274     struct timeval tmp_time;    /* used for time conversion */
275
276     char    **updvals;
277     int       schedule_smooth = 0;
278     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
279
280     /* a vector of future Holt-Winters seasonal coefs */
281     unsigned long elapsed_pdp_st;
282
283     /* number of elapsed PDP steps since last update */
284     unsigned long *rra_step_cnt = NULL;
285
286     /* number of rows to be updated in an RRA for a data
287      * value. */
288     unsigned long start_pdp_offset;
289
290     /* number of PDP steps since the last update that
291      * are assigned to the first CDP to be generated
292      * since the last update. */
293     unsigned short scratch_idx;
294
295     /* index into the CDP scratch array */
296     enum cf_en current_cf;
297
298     /* numeric id of the current consolidation function */
299     rpnstack_t rpnstack;    /* used for COMPUTE DS */
300     int       version;  /* rrd version */
301     char     *endptr;   /* used in the conversion */
302     rrd_file_t *rrd_file;
303
304     rpnstack_init(&rpnstack);
305
306     /* need at least 1 arguments: data. */
307     if (argc < 1) {
308         rrd_set_error("Not enough arguments");
309         return -1;
310     }
311
312     rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
313     if (rrd_file == NULL) {
314         return -1;
315     }
316
317     /* initialize time */
318     version = atoi(rrd.stat_head->version);
319     gettimeofday(&tmp_time, 0);
320     normalize_time(&tmp_time);
321     current_time = tmp_time.tv_sec;
322     if (version >= 3) {
323         current_time_usec = tmp_time.tv_usec;
324     } else {
325         current_time_usec = 0;
326     }
327
328     rra_current = rra_start = rra_begin = rrd_file->header_len;
329     /* This is defined in the ANSI C standard, section 7.9.5.3:
330
331        When a file is opened with udpate mode ('+' as the second
332        or third character in the ... list of mode argument
333        variables), both input and output may be performed on the
334        associated stream.  However, ...  input may not be directly
335        followed by output without an intervening call to a file
336        positioning function, unless the input operation encounters
337        end-of-file. */
338 #if 0                   //def HAVE_MMAP
339     rrd_filesize = rrd_file->file_size;
340     fseek(rrd_file->fd, 0, SEEK_END);
341     rrd_filesize = ftell(rrd_file->fd);
342     fseek(rrd_file->fd, rra_current, SEEK_SET);
343 #else
344 //    fseek(rrd_file->fd, 0, SEEK_CUR);
345 #endif
346
347
348     /* get exclusive lock to whole file.
349      * lock gets removed when we close the file.
350      */
351     if (LockRRD(rrd_file->fd) != 0) {
352         rrd_set_error("could not lock RRD");
353         rrd_free(&rrd);
354         close(rrd_file->fd);
355         return (-1);
356     }
357
358     if ((updvals =
359          malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
360         rrd_set_error("allocating updvals pointer array");
361         rrd_free(&rrd);
362         close(rrd_file->fd);
363         return (-1);
364     }
365
366     if ((pdp_temp = malloc(sizeof(rrd_value_t)
367                            * rrd.stat_head->ds_cnt)) == NULL) {
368         rrd_set_error("allocating pdp_temp ...");
369         free(updvals);
370         rrd_free(&rrd);
371         close(rrd_file->fd);
372         return (-1);
373     }
374
375     if ((tmpl_idx = malloc(sizeof(unsigned long)
376                            * (rrd.stat_head->ds_cnt + 1))) == NULL) {
377         rrd_set_error("allocating tmpl_idx ...");
378         free(pdp_temp);
379         free(updvals);
380         rrd_free(&rrd);
381         close(rrd_file->fd);
382         return (-1);
383     }
384     /* initialize tmplt redirector */
385     /* default config example (assume DS 1 is a CDEF DS)
386        tmpl_idx[0] -> 0; (time)
387        tmpl_idx[1] -> 1; (DS 0)
388        tmpl_idx[2] -> 3; (DS 2)
389        tmpl_idx[3] -> 4; (DS 3) */
390     tmpl_idx[0] = 0;    /* time */
391     for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
392         if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
393             tmpl_idx[ii++] = i;
394     }
395     tmpl_cnt = ii;
396
397     if (tmplt) {
398         /* we should work on a writeable copy here */
399         char     *dsname;
400         unsigned int tmpl_len;
401         char     *tmplt_copy = strdup(tmplt);
402
403         dsname = tmplt_copy;
404         tmpl_cnt = 1;   /* the first entry is the time */
405         tmpl_len = strlen(tmplt_copy);
406         for (i = 0; i <= tmpl_len; i++) {
407             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
408                 tmplt_copy[i] = '\0';
409                 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
410                     rrd_set_error
411                         ("tmplt contains more DS definitions than RRD");
412                     free(updvals);
413                     free(pdp_temp);
414                     free(tmpl_idx);
415                     rrd_free(&rrd);
416                     close(rrd_file->fd);
417                     return (-1);
418                 }
419                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
420                     rrd_set_error("unknown DS name '%s'", dsname);
421                     free(updvals);
422                     free(pdp_temp);
423                     free(tmplt_copy);
424                     free(tmpl_idx);
425                     rrd_free(&rrd);
426                     close(rrd_file->fd);
427                     return (-1);
428                 } else {
429                     /* the first element is always the time */
430                     tmpl_idx[tmpl_cnt - 1]++;
431                     /* go to the next entry on the tmplt_copy */
432                     dsname = &tmplt_copy[i + 1];
433                     /* fix the damage we did before */
434                     if (i < tmpl_len) {
435                         tmplt_copy[i] = ':';
436                     }
437
438                 }
439             }
440         }
441         free(tmplt_copy);
442     }
443     if ((pdp_new = malloc(sizeof(rrd_value_t)
444                           * rrd.stat_head->ds_cnt)) == NULL) {
445         rrd_set_error("allocating pdp_new ...");
446         free(updvals);
447         free(pdp_temp);
448         free(tmpl_idx);
449         rrd_free(&rrd);
450         close(rrd_file->fd);
451         return (-1);
452     }
453 #if 0                   //def HAVE_MMAP
454     rrd_mmaped_file = mmap(0,
455                            rrd_file->file_len,
456                            PROT_READ | PROT_WRITE,
457                            MAP_SHARED, fileno(in_file), 0);
458     if (rrd_mmaped_file == MAP_FAILED) {
459         rrd_set_error("error mmapping file %s", filename);
460         free(updvals);
461         free(pdp_temp);
462         free(tmpl_idx);
463         rrd_free(&rrd);
464         close(rrd_file->fd);
465         return (-1);
466     }
467 #ifdef USE_MADVISE
468     /* when we use mmaping we tell the kernel the mmap equivalent
469        of POSIX_FADV_RANDOM */
470     madvise(rrd_mmaped_file, rrd_filesize, POSIX_MADV_RANDOM);
471 #endif
472 #endif
473     /* loop through the arguments. */
474     for (arg_i = 0; arg_i < argc; arg_i++) {
475         char     *stepper = strdup(argv[arg_i]);
476         char     *step_start = stepper;
477         char     *p;
478         char     *parsetime_error = NULL;
479         enum { atstyle, normal } timesyntax;
480         struct rrd_time_value ds_tv;
481
482         if (stepper == NULL) {
483             rrd_set_error("failed duplication argv entry");
484             free(step_start);
485             free(updvals);
486             free(pdp_temp);
487             free(tmpl_idx);
488             rrd_free(&rrd);
489 #ifdef HAVE_MMAP
490             rrd_close(rrd_file);
491 #endif
492             close(rrd_file->fd);
493             return (-1);
494         }
495         /* initialize all ds input to unknown except the first one
496            which has always got to be set */
497         for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
498             updvals[ii] = "U";
499         updvals[0] = stepper;
500         /* separate all ds elements; first must be examined separately
501            due to alternate time syntax */
502         if ((p = strchr(stepper, '@')) != NULL) {
503             timesyntax = atstyle;
504             *p = '\0';
505             stepper = p + 1;
506         } else if ((p = strchr(stepper, ':')) != NULL) {
507             timesyntax = normal;
508             *p = '\0';
509             stepper = p + 1;
510         } else {
511             rrd_set_error
512                 ("expected timestamp not found in data source from %s",
513                  argv[arg_i]);
514             free(step_start);
515             break;
516         }
517         ii = 1;
518         updvals[tmpl_idx[ii]] = stepper;
519         while (*stepper) {
520             if (*stepper == ':') {
521                 *stepper = '\0';
522                 ii++;
523                 if (ii < tmpl_cnt) {
524                     updvals[tmpl_idx[ii]] = stepper + 1;
525                 }
526             }
527             stepper++;
528         }
529
530         if (ii != tmpl_cnt - 1) {
531             rrd_set_error
532                 ("expected %lu data source readings (got %lu) from %s",
533                  tmpl_cnt - 1, ii, argv[arg_i]);
534             free(step_start);
535             break;
536         }
537
538         /* get the time from the reading ... handle N */
539         if (timesyntax == atstyle) {
540             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
541                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
542                 free(step_start);
543                 break;
544             }
545             if (ds_tv.type == RELATIVE_TO_END_TIME ||
546                 ds_tv.type == RELATIVE_TO_START_TIME) {
547                 rrd_set_error("specifying time relative to the 'start' "
548                               "or 'end' makes no sense here: %s", updvals[0]);
549                 free(step_start);
550                 break;
551             }
552
553             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
554             current_time_usec = 0;  /* FIXME: how to handle usecs here ? */
555
556         } else if (strcmp(updvals[0], "N") == 0) {
557             gettimeofday(&tmp_time, 0);
558             normalize_time(&tmp_time);
559             current_time = tmp_time.tv_sec;
560             current_time_usec = tmp_time.tv_usec;
561         } else {
562             double    tmp;
563
564             tmp = strtod(updvals[0], 0);
565             current_time = floor(tmp);
566             current_time_usec =
567                 (long) ((tmp - (double) current_time) * 1000000.0);
568         }
569         /* dont do any correction for old version RRDs */
570         if (version < 3)
571             current_time_usec = 0;
572
573         if (current_time < rrd.live_head->last_up ||
574             (current_time == rrd.live_head->last_up &&
575              (long) current_time_usec <=
576              (long) rrd.live_head->last_up_usec)) {
577             rrd_set_error("illegal attempt to update using time %ld when "
578                           "last update time is %ld (minimum one second step)",
579                           current_time, rrd.live_head->last_up);
580             free(step_start);
581             break;
582         }
583
584
585         /* seek to the beginning of the rra's */
586         if (rra_current != rra_begin) {
587 #ifndef HAVE_MMAP
588             if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
589                 rrd_set_error("seek error in rrd");
590                 free(step_start);
591                 break;
592             }
593 #endif
594             rra_current = rra_begin;
595         }
596         rra_start = rra_begin;
597
598         /* when was the current pdp started */
599         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
600         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
601
602         /* when did the last pdp_st occur */
603         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
604         occu_pdp_st = current_time - occu_pdp_age;
605
606         /* interval = current_time - rrd.live_head->last_up; */
607         interval = (double) (current_time - rrd.live_head->last_up)
608             + (double) ((long) current_time_usec -
609                         (long) rrd.live_head->last_up_usec) / 1000000.0;
610
611         if (occu_pdp_st > proc_pdp_st) {
612             /* OK we passed the pdp_st moment */
613             pre_int = (long) occu_pdp_st - rrd.live_head->last_up;  /* how much of the input data
614                                                                      * occurred before the latest
615                                                                      * pdp_st moment*/
616             pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0;  /* adjust usecs */
617             post_int = occu_pdp_age;    /* how much after it */
618             post_int += ((double) current_time_usec) / 1000000.0;   /* adjust usecs */
619         } else {
620             pre_int = interval;
621             post_int = 0;
622         }
623
624 #ifdef DEBUG
625         printf("proc_pdp_age %lu\t"
626                "proc_pdp_st %lu\t"
627                "occu_pfp_age %lu\t"
628                "occu_pdp_st %lu\t"
629                "int %lf\t"
630                "pre_int %lf\t"
631                "post_int %lf\n", proc_pdp_age, proc_pdp_st,
632                occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
633 #endif
634
635         /* process the data sources and update the pdp_prep 
636          * area accordingly */
637         for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
638             enum dst_en dst_idx;
639
640             dst_idx = dst_conv(rrd.ds_def[i].dst);
641
642             /* make sure we do not build diffs with old last_ds values */
643             if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
644                 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
645                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
646             }
647
648             /* NOTE: DST_CDEF should never enter this if block, because
649              * updvals[i+1][0] is initialized to 'U'; unless the caller
650              * accidently specified a value for the DST_CDEF. To handle 
651              * this case, an extra check is required. */
652
653             if ((updvals[i + 1][0] != 'U') &&
654                 (dst_idx != DST_CDEF) &&
655                 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
656                 double    rate = DNAN;
657
658                 /* the data source type defines how to process the data */
659                 /* pdp_new contains rate * time ... eg the bytes
660                  * transferred during the interval. Doing it this way saves
661                  * a lot of math operations */
662
663
664                 switch (dst_idx) {
665                 case DST_COUNTER:
666                 case DST_DERIVE:
667                     if (rrd.pdp_prep[i].last_ds[0] != 'U') {
668                         for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
669                             if ((updvals[i + 1][ii] < '0'
670                                  || updvals[i + 1][ii] > '9') && (ii != 0
671                                                                   && updvals[i
672                                                                              +
673                                                                              1]
674                                                                   [ii] !=
675                                                                   '-')) {
676                                 rrd_set_error("not a simple integer: '%s'",
677                                               updvals[i + 1]);
678                                 break;
679                             }
680                         }
681                         if (rrd_test_error()) {
682                             break;
683                         }
684                         pdp_new[i] =
685                             rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
686                         if (dst_idx == DST_COUNTER) {
687                             /* simple overflow catcher suggested by Andres Kroonmaa */
688                             /* this will fail terribly for non 32 or 64 bit counters ... */
689                             /* are there any others in SNMP land ? */
690                             if (pdp_new[i] < (double) 0.0)
691                                 pdp_new[i] += (double) 4294967296.0;    /* 2^32 */
692                             if (pdp_new[i] < (double) 0.0)
693                                 pdp_new[i] += (double) 18446744069414584320.0;
694                             /* 2^64-2^32 */ ;
695                         }
696                         rate = pdp_new[i] / interval;
697                     } else {
698                         pdp_new[i] = DNAN;
699                     }
700                     break;
701                 case DST_ABSOLUTE:
702                     errno = 0;
703                     pdp_new[i] = strtod(updvals[i + 1], &endptr);
704                     if (errno > 0) {
705                         rrd_set_error("converting  '%s' to float: %s",
706                                       updvals[i + 1], rrd_strerror(errno));
707                         break;
708                     };
709                     if (endptr[0] != '\0') {
710                         rrd_set_error
711                             ("conversion of '%s' to float not complete: tail '%s'",
712                              updvals[i + 1], endptr);
713                         break;
714                     }
715                     rate = pdp_new[i] / interval;
716                     break;
717                 case DST_GAUGE:
718                     errno = 0;
719                     pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
720                     if (errno > 0) {
721                         rrd_set_error("converting  '%s' to float: %s",
722                                       updvals[i + 1], rrd_strerror(errno));
723                         break;
724                     };
725                     if (endptr[0] != '\0') {
726                         rrd_set_error
727                             ("conversion of '%s' to float not complete: tail '%s'",
728                              updvals[i + 1], endptr);
729                         break;
730                     }
731                     rate = pdp_new[i] / interval;
732                     break;
733                 default:
734                     rrd_set_error("rrd contains unknown DS type : '%s'",
735                                   rrd.ds_def[i].dst);
736                     break;
737                 }
738                 /* break out of this for loop if the error string is set */
739                 if (rrd_test_error()) {
740                     break;
741                 }
742                 /* make sure pdp_temp is neither too large or too small
743                  * if any of these occur it becomes unknown ...
744                  * sorry folks ... */
745                 if (!isnan(rate) &&
746                     ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
747                       rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
748                      (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
749                       rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
750                     pdp_new[i] = DNAN;
751                 }
752             } else {
753                 /* no news is news all the same */
754                 pdp_new[i] = DNAN;
755             }
756
757
758             /* make a copy of the command line argument for the next run */
759 #ifdef DEBUG
760             fprintf(stderr,
761                     "prep ds[%lu]\t"
762                     "last_arg '%s'\t"
763                     "this_arg '%s'\t"
764                     "pdp_new %10.2f\n",
765                     i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
766 #endif
767             strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
768             rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
769         }
770         /* break out of the argument parsing loop if the error_string is set */
771         if (rrd_test_error()) {
772             free(step_start);
773             break;
774         }
775         /* has a pdp_st moment occurred since the last run ? */
776
777         if (proc_pdp_st == occu_pdp_st) {
778             /* no we have not passed a pdp_st moment. therefore update is simple */
779
780             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
781                 if (isnan(pdp_new[i])) {
782                     /* this is not realy accurate if we use subsecond data arival time
783                        should have thought of it when going subsecond resolution ...
784                        sorry next format change we will have it! */
785                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
786                         floor(interval);
787                 } else {
788                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
789                         rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
790                     } else {
791                         rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
792                     }
793                 }
794 #ifdef DEBUG
795                 fprintf(stderr,
796                         "NO PDP  ds[%lu]\t"
797                         "value %10.2f\t"
798                         "unkn_sec %5lu\n",
799                         i,
800                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
801                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
802 #endif
803             }
804         } else {
805             /* an pdp_st has occurred. */
806
807             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
808              * occurred up to the last run.        
809              pdp_new[] contains rate*seconds from the latest run.
810              pdp_temp[] will contain the rate for cdp */
811
812             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
813                 /* update pdp_prep to the current pdp_st. */
814                 double    pre_unknown = 0.0;
815
816                 if (isnan(pdp_new[i]))
817                     /* a final bit of unkonwn to be added bevore calculation
818                      * we use a tempaorary variable for this so that we 
819                      * don't have to turn integer lines before using the value */
820                     pre_unknown = pre_int;
821                 else {
822                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
823                         rrd.pdp_prep[i].scratch[PDP_val].u_val =
824                             pdp_new[i] / interval * pre_int;
825                     } else {
826                         rrd.pdp_prep[i].scratch[PDP_val].u_val +=
827                             pdp_new[i] / interval * pre_int;
828                     }
829                 }
830
831
832                 /* if too much of the pdp_prep is unknown we dump it */
833                 if (
834                        /* removed because this does not agree with the definition
835                           a heart beat can be unknown */
836                        /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
837                           > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
838                        /* if the interval is larger thatn mrhb we get NAN */
839                        (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
840                        (occu_pdp_st - proc_pdp_st <=
841                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
842                     pdp_temp[i] = DNAN;
843                 } else {
844                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
845                         / ((double) (occu_pdp_st - proc_pdp_st
846                                      -
847                                      rrd.pdp_prep[i].
848                                      scratch[PDP_unkn_sec_cnt].u_cnt)
849                            - pre_unknown);
850                 }
851
852                 /* process CDEF data sources; remember each CDEF DS can
853                  * only reference other DS with a lower index number */
854                 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
855                     rpnp_t   *rpnp;
856
857                     rpnp =
858                         rpn_expand((rpn_cdefds_t *) &
859                                    (rrd.ds_def[i].par[DS_cdef]));
860                     /* substitue data values for OP_VARIABLE nodes */
861                     for (ii = 0; rpnp[ii].op != OP_END; ii++) {
862                         if (rpnp[ii].op == OP_VARIABLE) {
863                             rpnp[ii].op = OP_NUMBER;
864                             rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
865                         }
866                     }
867                     /* run the rpn calculator */
868                     if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
869                         free(rpnp);
870                         break;  /* exits the data sources pdp_temp loop */
871                     }
872                 }
873
874                 /* make pdp_prep ready for the next run */
875                 if (isnan(pdp_new[i])) {
876                     /* this is not realy accurate if we use subsecond data arival time
877                        should have thought of it when going subsecond resolution ...
878                        sorry next format change we will have it! */
879                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
880                         floor(post_int);
881                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
882                 } else {
883                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
884                     rrd.pdp_prep[i].scratch[PDP_val].u_val =
885                         pdp_new[i] / interval * post_int;
886                 }
887
888 #ifdef DEBUG
889                 fprintf(stderr,
890                         "PDP UPD ds[%lu]\t"
891                         "pdp_temp %10.2f\t"
892                         "new_prep %10.2f\t"
893                         "new_unkn_sec %5lu\n",
894                         i, pdp_temp[i],
895                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
896                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
897 #endif
898             }
899
900             /* if there were errors during the last loop, bail out here */
901             if (rrd_test_error()) {
902                 free(step_start);
903                 break;
904             }
905
906             /* compute the number of elapsed pdp_st moments */
907             elapsed_pdp_st =
908                 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
909 #ifdef DEBUG
910             fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
911 #endif
912             if (rra_step_cnt == NULL) {
913                 rra_step_cnt = (unsigned long *)
914                     malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
915             }
916
917             for (i = 0, rra_start = rra_begin;
918                  i < rrd.stat_head->rra_cnt;
919                  rra_start +=
920                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
921                  sizeof(rrd_value_t), i++) {
922                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
923                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
924                     (proc_pdp_st / rrd.stat_head->pdp_step) %
925                     rrd.rra_def[i].pdp_cnt;
926                 if (start_pdp_offset <= elapsed_pdp_st) {
927                     rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
928                         rrd.rra_def[i].pdp_cnt + 1;
929                 } else {
930                     rra_step_cnt[i] = 0;
931                 }
932
933                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
934                     /* If this is a bulk update, we need to skip ahead in the seasonal
935                      * arrays so that they will be correct for the next observed value;
936                      * note that for the bulk update itself, no update will occur to
937                      * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
938                      * be set to DNAN. */
939                     if (rra_step_cnt[i] > 2) {
940                         /* skip update by resetting rra_step_cnt[i],
941                          * note that this is not data source specific; this is due
942                          * to the bulk update, not a DNAN value for the specific data
943                          * source. */
944                         rra_step_cnt[i] = 0;
945                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
946                                         elapsed_pdp_st, &last_seasonal_coef);
947                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
948                                         elapsed_pdp_st + 1, &seasonal_coef);
949                     }
950
951                     /* periodically run a smoother for seasonal effects */
952                     /* Need to use first cdp parameter buffer to track
953                      * burnin (burnin requires a specific smoothing schedule).
954                      * The CDP_init_seasonal parameter is really an RRA level,
955                      * not a data source within RRA level parameter, but the rra_def
956                      * is read only for rrd_update (not flushed to disk). */
957                     iii = i * (rrd.stat_head->ds_cnt);
958                     if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
959                         <= BURNIN_CYCLES) {
960                         if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
961                             > rrd.rra_def[i].row_cnt - 1) {
962                             /* mark off one of the burnin cycles */
963                             ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
964                                u_cnt);
965                             schedule_smooth = 1;
966                         }
967                     } else {
968                         /* someone has no doubt invented a trick to deal with this
969                          * wrap around, but at least this code is clear. */
970                         if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
971                             u_cnt > rrd.rra_ptr[i].cur_row) {
972                             /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
973                              * mapping between PDP and CDP */
974                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
975                                 >=
976                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
977                                 u_cnt) {
978 #ifdef DEBUG
979                                 fprintf(stderr,
980                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
981                                         rrd.rra_ptr[i].cur_row,
982                                         elapsed_pdp_st,
983                                         rrd.rra_def[i].
984                                         par[RRA_seasonal_smooth_idx].u_cnt);
985 #endif
986                                 schedule_smooth = 1;
987                             }
988                         } else {
989                             /* can't rely on negative numbers because we are working with
990                              * unsigned values */
991                             /* Don't need modulus here. If we've wrapped more than once, only
992                              * one smooth is executed at the end. */
993                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
994                                 rrd.rra_def[i].row_cnt
995                                 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
996                                 rrd.rra_def[i].row_cnt >=
997                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
998                                 u_cnt) {
999 #ifdef DEBUG
1000                                 fprintf(stderr,
1001                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1002                                         rrd.rra_ptr[i].cur_row,
1003                                         elapsed_pdp_st,
1004                                         rrd.rra_def[i].
1005                                         par[RRA_seasonal_smooth_idx].u_cnt);
1006 #endif
1007                                 schedule_smooth = 1;
1008                             }
1009                         }
1010                     }
1011
1012                     rra_current = rrd_tell(rrd_file);
1013                 }
1014                 /* if cf is DEVSEASONAL or SEASONAL */
1015                 if (rrd_test_error())
1016                     break;
1017
1018                 /* update CDP_PREP areas */
1019                 /* loop over data soures within each RRA */
1020                 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1021
1022                     /* iii indexes the CDP prep area for this data source within the RRA */
1023                     iii = i * rrd.stat_head->ds_cnt + ii;
1024
1025                     if (rrd.rra_def[i].pdp_cnt > 1) {
1026
1027                         if (rra_step_cnt[i] > 0) {
1028                             /* If we are in this block, as least 1 CDP value will be written to
1029                              * disk, this is the CDP_primary_val entry. If more than 1 value needs
1030                              * to be written, then the "fill in" value is the CDP_secondary_val
1031                              * entry. */
1032                             if (isnan(pdp_temp[ii])) {
1033                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1034                                     u_cnt += start_pdp_offset;
1035                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1036                                     u_val = DNAN;
1037                             } else {
1038                                 /* CDP_secondary value is the RRA "fill in" value for intermediary
1039                                  * CDP data entries. No matter the CF, the value is the same because
1040                                  * the average, max, min, and last of a list of identical values is
1041                                  * the same, namely, the value itself. */
1042                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1043                                     u_val = pdp_temp[ii];
1044                             }
1045
1046                             if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1047                                 u_cnt >
1048                                 rrd.rra_def[i].pdp_cnt *
1049                                 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
1050                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1051                                     u_val = DNAN;
1052                                 /* initialize carry over */
1053                                 if (current_cf == CF_AVERAGE) {
1054                                     if (isnan(pdp_temp[ii])) {
1055                                         rrd.cdp_prep[iii].scratch[CDP_val].
1056                                             u_val = DNAN;
1057                                     } else {
1058                                         rrd.cdp_prep[iii].scratch[CDP_val].
1059                                             u_val =
1060                                             pdp_temp[ii] *
1061                                             ((elapsed_pdp_st -
1062                                               start_pdp_offset) %
1063                                              rrd.rra_def[i].pdp_cnt);
1064                                     }
1065                                 } else {
1066                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1067                                         pdp_temp[ii];
1068                                 }
1069                             } else {
1070                                 rrd_value_t cum_val, cur_val;
1071
1072                                 switch (current_cf) {
1073                                 case CF_AVERAGE:
1074                                     cum_val =
1075                                         IFDNAN(rrd.cdp_prep[iii].
1076                                                scratch[CDP_val].u_val, 0.0);
1077                                     cur_val = IFDNAN(pdp_temp[ii], 0.0);
1078                                     rrd.cdp_prep[iii].
1079                                         scratch[CDP_primary_val].u_val =
1080                                         (cum_val +
1081                                          cur_val * start_pdp_offset) /
1082                                         (rrd.rra_def[i].pdp_cnt -
1083                                          rrd.cdp_prep[iii].
1084                                          scratch[CDP_unkn_pdp_cnt].u_cnt);
1085                                     /* initialize carry over value */
1086                                     if (isnan(pdp_temp[ii])) {
1087                                         rrd.cdp_prep[iii].scratch[CDP_val].
1088                                             u_val = DNAN;
1089                                     } else {
1090                                         rrd.cdp_prep[iii].scratch[CDP_val].
1091                                             u_val =
1092                                             pdp_temp[ii] *
1093                                             ((elapsed_pdp_st -
1094                                               start_pdp_offset) %
1095                                              rrd.rra_def[i].pdp_cnt);
1096                                     }
1097                                     break;
1098                                 case CF_MAXIMUM:
1099                                     cum_val =
1100                                         IFDNAN(rrd.cdp_prep[iii].
1101                                                scratch[CDP_val].u_val, -DINF);
1102                                     cur_val = IFDNAN(pdp_temp[ii], -DINF);
1103 #ifdef DEBUG
1104                                     if (isnan
1105                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1106                                          u_val) && isnan(pdp_temp[ii])) {
1107                                         fprintf(stderr,
1108                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1109                                                 i, ii);
1110                                         exit(-1);
1111                                     }
1112 #endif
1113                                     if (cur_val > cum_val)
1114                                         rrd.cdp_prep[iii].
1115                                             scratch[CDP_primary_val].u_val =
1116                                             cur_val;
1117                                     else
1118                                         rrd.cdp_prep[iii].
1119                                             scratch[CDP_primary_val].u_val =
1120                                             cum_val;
1121                                     /* initialize carry over value */
1122                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1123                                         pdp_temp[ii];
1124                                     break;
1125                                 case CF_MINIMUM:
1126                                     cum_val =
1127                                         IFDNAN(rrd.cdp_prep[iii].
1128                                                scratch[CDP_val].u_val, DINF);
1129                                     cur_val = IFDNAN(pdp_temp[ii], DINF);
1130 #ifdef DEBUG
1131                                     if (isnan
1132                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1133                                          u_val) && isnan(pdp_temp[ii])) {
1134                                         fprintf(stderr,
1135                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1136                                                 i, ii);
1137                                         exit(-1);
1138                                     }
1139 #endif
1140                                     if (cur_val < cum_val)
1141                                         rrd.cdp_prep[iii].
1142                                             scratch[CDP_primary_val].u_val =
1143                                             cur_val;
1144                                     else
1145                                         rrd.cdp_prep[iii].
1146                                             scratch[CDP_primary_val].u_val =
1147                                             cum_val;
1148                                     /* initialize carry over value */
1149                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1150                                         pdp_temp[ii];
1151                                     break;
1152                                 case CF_LAST:
1153                                 default:
1154                                     rrd.cdp_prep[iii].
1155                                         scratch[CDP_primary_val].u_val =
1156                                         pdp_temp[ii];
1157                                     /* initialize carry over value */
1158                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1159                                         pdp_temp[ii];
1160                                     break;
1161                                 }
1162                             }   /* endif meets xff value requirement for a valid value */
1163                             /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1164                              * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1165                             if (isnan(pdp_temp[ii]))
1166                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1167                                     u_cnt =
1168                                     (elapsed_pdp_st -
1169                                      start_pdp_offset) %
1170                                     rrd.rra_def[i].pdp_cnt;
1171                             else
1172                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1173                                     u_cnt = 0;
1174                         } else {    /* rra_step_cnt[i]  == 0 */
1175
1176 #ifdef DEBUG
1177                             if (isnan
1178                                 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1179                                 fprintf(stderr,
1180                                         "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1181                                         i, ii);
1182                             } else {
1183                                 fprintf(stderr,
1184                                         "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1185                                         i, ii,
1186                                         rrd.cdp_prep[iii].scratch[CDP_val].
1187                                         u_val);
1188                             }
1189 #endif
1190                             if (isnan(pdp_temp[ii])) {
1191                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1192                                     u_cnt += elapsed_pdp_st;
1193                             } else
1194                                 if (isnan
1195                                     (rrd.cdp_prep[iii].scratch[CDP_val].
1196                                      u_val)) {
1197                                 if (current_cf == CF_AVERAGE) {
1198                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1199                                         pdp_temp[ii] * elapsed_pdp_st;
1200                                 } else {
1201                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1202                                         pdp_temp[ii];
1203                                 }
1204 #ifdef DEBUG
1205                                 fprintf(stderr,
1206                                         "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1207                                         i, ii,
1208                                         rrd.cdp_prep[iii].scratch[CDP_val].
1209                                         u_val);
1210 #endif
1211                             } else {
1212                                 switch (current_cf) {
1213                                 case CF_AVERAGE:
1214                                     rrd.cdp_prep[iii].scratch[CDP_val].
1215                                         u_val +=
1216                                         pdp_temp[ii] * elapsed_pdp_st;
1217                                     break;
1218                                 case CF_MINIMUM:
1219                                     if (pdp_temp[ii] <
1220                                         rrd.cdp_prep[iii].scratch[CDP_val].
1221                                         u_val)
1222                                         rrd.cdp_prep[iii].scratch[CDP_val].
1223                                             u_val = pdp_temp[ii];
1224                                     break;
1225                                 case CF_MAXIMUM:
1226                                     if (pdp_temp[ii] >
1227                                         rrd.cdp_prep[iii].scratch[CDP_val].
1228                                         u_val)
1229                                         rrd.cdp_prep[iii].scratch[CDP_val].
1230                                             u_val = pdp_temp[ii];
1231                                     break;
1232                                 case CF_LAST:
1233                                 default:
1234                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1235                                         pdp_temp[ii];
1236                                     break;
1237                                 }
1238                             }
1239                         }
1240                     } else {    /* rrd.rra_def[i].pdp_cnt == 1 */
1241                         if (elapsed_pdp_st > 2) {
1242                             switch (current_cf) {
1243                             case CF_AVERAGE:
1244                             default:
1245                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1246                                     u_val = pdp_temp[ii];
1247                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1248                                     u_val = pdp_temp[ii];
1249                                 break;
1250                             case CF_SEASONAL:
1251                             case CF_DEVSEASONAL:
1252                                 /* need to update cached seasonal values, so they are consistent
1253                                  * with the bulk update */
1254                                 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1255                                  * CDP_last_deviation are the same. */
1256                                 rrd.cdp_prep[iii].
1257                                     scratch[CDP_hw_last_seasonal].u_val =
1258                                     last_seasonal_coef[ii];
1259                                 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1260                                     u_val = seasonal_coef[ii];
1261                                 break;
1262                             case CF_HWPREDICT:
1263                                 /* need to update the null_count and last_null_count.
1264                                  * even do this for non-DNAN pdp_temp because the
1265                                  * algorithm is not learning from batch updates. */
1266                                 rrd.cdp_prep[iii].scratch[CDP_null_count].
1267                                     u_cnt += elapsed_pdp_st;
1268                                 rrd.cdp_prep[iii].
1269                                     scratch[CDP_last_null_count].u_cnt +=
1270                                     elapsed_pdp_st - 1;
1271                                 /* fall through */
1272                             case CF_DEVPREDICT:
1273                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1274                                     u_val = DNAN;
1275                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1276                                     u_val = DNAN;
1277                                 break;
1278                             case CF_FAILURES:
1279                                 /* do not count missed bulk values as failures */
1280                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1281                                     u_val = 0;
1282                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1283                                     u_val = 0;
1284                                 /* need to reset violations buffer.
1285                                  * could do this more carefully, but for now, just
1286                                  * assume a bulk update wipes away all violations. */
1287                                 erase_violations(&rrd, iii, i);
1288                                 break;
1289                             }
1290                         }
1291                     }   /* endif rrd.rra_def[i].pdp_cnt == 1 */
1292
1293                     if (rrd_test_error())
1294                         break;
1295
1296                 }       /* endif data sources loop */
1297             }           /* end RRA Loop */
1298
1299             /* this loop is only entered if elapsed_pdp_st < 3 */
1300             for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1301                  j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1302                 for (i = 0, rra_start = rra_begin;
1303                      i < rrd.stat_head->rra_cnt;
1304                      rra_start +=
1305                      rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1306                      sizeof(rrd_value_t), i++) {
1307                     if (rrd.rra_def[i].pdp_cnt > 1)
1308                         continue;
1309
1310                     current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1311                     if (current_cf == CF_SEASONAL
1312                         || current_cf == CF_DEVSEASONAL) {
1313                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
1314                                         elapsed_pdp_st + (scratch_idx ==
1315                                                           CDP_primary_val ? 1
1316                                                           : 2),
1317                                         &seasonal_coef);
1318                         rra_current = rrd_tell(rrd_file);
1319                     }
1320                     if (rrd_test_error())
1321                         break;
1322                     /* loop over data soures within each RRA */
1323                     for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1324                         update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1325                                            i * (rrd.stat_head->ds_cnt) + ii,
1326                                            i, ii, scratch_idx, seasonal_coef);
1327                     }
1328                 }       /* end RRA Loop */
1329                 if (rrd_test_error())
1330                     break;
1331             }           /* end elapsed_pdp_st loop */
1332
1333             if (rrd_test_error())
1334                 break;
1335
1336             /* Ready to write to disk */
1337             /* Move sequentially through the file, writing one RRA at a time.
1338              * Note this architecture divorces the computation of CDP with
1339              * flushing updated RRA entries to disk. */
1340             for (i = 0, rra_start = rra_begin;
1341                  i < rrd.stat_head->rra_cnt;
1342                  rra_start +=
1343                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1344                  sizeof(rrd_value_t), i++) {
1345                 /* is th5Aere anything to write for this RRA? If not, continue. */
1346                 if (rra_step_cnt[i] == 0)
1347                     continue;
1348
1349                 /* write the first row */
1350 #ifdef DEBUG
1351                 fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1352 #endif
1353                 rrd.rra_ptr[i].cur_row++;
1354                 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1355                     rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1356                 /* positition on the first row */
1357                 rra_pos_tmp = rra_start +
1358                     (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1359                     sizeof(rrd_value_t);
1360                 if (rra_pos_tmp != rra_current) {
1361 #ifndef HAVE_MMAP
1362                     if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1363                         rrd_set_error("seek error in rrd");
1364                         break;
1365                     }
1366 #endif
1367                     rra_current = rra_pos_tmp;
1368                 }
1369 #ifdef DEBUG
1370                 fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1371 #endif
1372                 scratch_idx = CDP_primary_val;
1373                 if (pcdp_summary != NULL) {
1374                     rra_time = (current_time - current_time
1375                                 % (rrd.rra_def[i].pdp_cnt *
1376                                    rrd.stat_head->pdp_step))
1377                         -
1378                         ((rra_step_cnt[i] -
1379                           1) * rrd.rra_def[i].pdp_cnt *
1380                          rrd.stat_head->pdp_step);
1381                 }
1382 #ifdef HAVE_MMAP
1383                 pcdp_summary =
1384                     write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1385                                   rrd_file->fd, pcdp_summary, &rra_time,
1386                                   rrd_file->file_start);
1387 #else
1388                 pcdp_summary =
1389                     write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1390                                   rrd_file->fd, pcdp_summary, &rra_time);
1391 #endif
1392                 if (rrd_test_error())
1393                     break;
1394
1395                 /* write other rows of the bulk update, if any */
1396                 scratch_idx = CDP_secondary_val;
1397                 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1398                     if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1399 #ifdef DEBUG
1400                         fprintf(stderr,
1401                                 "Wraparound for RRA %s, %lu updates left\n",
1402                                 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1403 #endif
1404                         /* wrap */
1405                         rrd.rra_ptr[i].cur_row = 0;
1406                         /* seek back to beginning of current rra */
1407                         if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1408                             rrd_set_error("seek error in rrd");
1409                             break;
1410                         }
1411 #ifdef DEBUG
1412                         fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1413                                 rrd_file->pos);
1414 #endif
1415                         rra_current = rra_start;
1416                     }
1417                     if (pcdp_summary != NULL) {
1418                         rra_time = (current_time - current_time
1419                                     % (rrd.rra_def[i].pdp_cnt *
1420                                        rrd.stat_head->pdp_step))
1421                             -
1422                             ((rra_step_cnt[i] -
1423                               2) * rrd.rra_def[i].pdp_cnt *
1424                              rrd.stat_head->pdp_step);
1425                     }
1426 #ifdef HAVE_MMAP
1427                     pcdp_summary =
1428                         write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1429                                       rrd_file->fd, pcdp_summary, &rra_time,
1430                                       rrd_file->file_start);
1431 #else
1432                     pcdp_summary =
1433                         write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1434                                       rrd_file->fd, pcdp_summary, &rra_time);
1435 #endif
1436                 }
1437
1438                 if (rrd_test_error())
1439                     break;
1440             }           /* RRA LOOP */
1441
1442             /* break out of the argument parsing loop if error_string is set */
1443             if (rrd_test_error()) {
1444                 free(step_start);
1445                 break;
1446             }
1447
1448         }               /* endif a pdp_st has occurred */
1449         rrd.live_head->last_up = current_time;
1450         rrd.live_head->last_up_usec = current_time_usec;
1451         free(step_start);
1452     }                   /* function argument loop */
1453
1454     if (seasonal_coef != NULL)
1455         free(seasonal_coef);
1456     if (last_seasonal_coef != NULL)
1457         free(last_seasonal_coef);
1458     if (rra_step_cnt != NULL)
1459         free(rra_step_cnt);
1460     rpnstack_free(&rpnstack);
1461
1462 #ifdef HAVE_MMAP
1463     if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1464         rrd_set_error("error writing(unmapping) file: %s", filename);
1465     }
1466 #endif
1467     /* if we got here and if there is an error and if the file has not been
1468      * written to, then close things up and return. */
1469     if (rrd_test_error()) {
1470         free(updvals);
1471         free(tmpl_idx);
1472         rrd_free(&rrd);
1473         free(pdp_temp);
1474         free(pdp_new);
1475         close(rrd_file->fd);
1476         return (-1);
1477     }
1478
1479     /* aargh ... that was tough ... so many loops ... anyway, its done.
1480      * we just need to write back the live header portion now*/
1481
1482     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1483                             + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1484                             + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1485                  SEEK_SET) != 0) {
1486         rrd_set_error("seek rrd for live header writeback");
1487         free(updvals);
1488         free(tmpl_idx);
1489         rrd_free(&rrd);
1490         free(pdp_temp);
1491         free(pdp_new);
1492         close(rrd_file->fd);
1493         return (-1);
1494     }
1495
1496     if (version >= 3) {
1497         if (rrd_write(rrd_file, rrd.live_head,
1498                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1499             rrd_set_error("rrd_write live_head to rrd");
1500             free(updvals);
1501             rrd_free(&rrd);
1502             free(tmpl_idx);
1503             free(pdp_temp);
1504             free(pdp_new);
1505             close(rrd_file->fd);
1506             return (-1);
1507         }
1508     } else {
1509         if (rrd_write(rrd_file, &rrd.live_head->last_up,
1510                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1511             rrd_set_error("rrd_write live_head to rrd");
1512             free(updvals);
1513             rrd_free(&rrd);
1514             free(tmpl_idx);
1515             free(pdp_temp);
1516             free(pdp_new);
1517             close(rrd_file->fd);
1518             return (-1);
1519         }
1520     }
1521
1522
1523     if (rrd_write(rrd_file, rrd.pdp_prep,
1524                   sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1525         != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1526         rrd_set_error("rrd_write pdp_prep to rrd");
1527         free(updvals);
1528         rrd_free(&rrd);
1529         free(tmpl_idx);
1530         free(pdp_temp);
1531         free(pdp_new);
1532         close(rrd_file->fd);
1533         return (-1);
1534     }
1535
1536     if (rrd_write(rrd_file, rrd.cdp_prep,
1537                   sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1538                   rrd.stat_head->ds_cnt)
1539         != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1540                       rrd.stat_head->ds_cnt)) {
1541
1542         rrd_set_error("rrd_write cdp_prep to rrd");
1543         free(updvals);
1544         free(tmpl_idx);
1545         rrd_free(&rrd);
1546         free(pdp_temp);
1547         free(pdp_new);
1548         close(rrd_file->fd);
1549         return (-1);
1550     }
1551
1552     if (rrd_write(rrd_file, rrd.rra_ptr,
1553                   sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1554         != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1555         rrd_set_error("rrd_write rra_ptr to rrd");
1556         free(updvals);
1557         free(tmpl_idx);
1558         rrd_free(&rrd);
1559         free(pdp_temp);
1560         free(pdp_new);
1561         close(rrd_file->fd);
1562         return (-1);
1563     }
1564 #ifdef HAVE_POSIX_FADVISExxx
1565
1566     /* with update we have write ops, so they will probably not be done by now, this means
1567        the buffers will not get freed. But calling this for the whole file - header
1568        will let the data off the hook as soon as it is written when if it is from a previous
1569        update cycle. Calling fdsync to force things is much too hard here. */
1570
1571     if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1572         rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1573                       rrd_strerror(errno));
1574         close(rrd_file->fd);
1575         return (-1);
1576     }
1577 #endif
1578     /*XXX: ? */ rrd_flush(rrd_file);
1579
1580     /* calling the smoothing code here guarantees at most
1581      * one smoothing operation per rrd_update call. Unfortunately,
1582      * it is possible with bulk updates, or a long-delayed update
1583      * for smoothing to occur off-schedule. This really isn't
1584      * critical except during the burning cycles. */
1585     if (schedule_smooth) {
1586 //    in_file = fopen(filename,"rb+");
1587
1588
1589         rra_start = rra_begin;
1590         for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1591             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1592                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1593 #ifdef DEBUG
1594                 fprintf(stderr, "Running smoother for rra %ld\n", i);
1595 #endif
1596                 apply_smoother(&rrd, i, rra_start, rrd_file);
1597                 if (rrd_test_error())
1598                     break;
1599             }
1600             rra_start += rrd.rra_def[i].row_cnt
1601                 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1602         }
1603 #ifdef HAVE_POSIX_FADVISExxx
1604         /* same procedure as above ... */
1605         if (0 !=
1606             posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1607             rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1608                           rrd_strerror(errno));
1609             close(rrd_file->fd);
1610             return (-1);
1611         }
1612 #endif
1613         close(rrd_file->fd);
1614     }
1615
1616     /* OK now close the files and free the memory */
1617     if (close(rrd_file->fd) != 0) {
1618         rrd_set_error("closing rrd");
1619         free(updvals);
1620         free(tmpl_idx);
1621         rrd_free(&rrd);
1622         free(pdp_temp);
1623         free(pdp_new);
1624         return (-1);
1625     }
1626
1627     rrd_free(&rrd);
1628     free(updvals);
1629     free(tmpl_idx);
1630     free(pdp_new);
1631     free(pdp_temp);
1632     return (0);
1633 }
1634
1635 /*
1636  * get exclusive lock to whole file.
1637  * lock gets removed when we close the file
1638  *
1639  * returns 0 on success
1640  */
1641 int LockRRD(
1642     int in_file)
1643 {
1644     int       rcstat;
1645
1646     {
1647 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1648         struct _stat st;
1649
1650         if (_fstat(in_file, &st) == 0) {
1651             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1652         } else {
1653             rcstat = -1;
1654         }
1655 #else
1656         struct flock lock;
1657
1658         lock.l_type = F_WRLCK;  /* exclusive write lock */
1659         lock.l_len = 0; /* whole file */
1660         lock.l_start = 0;   /* start of file */
1661         lock.l_whence = SEEK_SET;   /* end of file */
1662
1663         rcstat = fcntl(in_file, F_SETLK, &lock);
1664 #endif
1665     }
1666
1667     return (rcstat);
1668 }
1669
1670
1671 #ifdef HAVE_MMAP
1672 info_t
1673
1674
1675
1676
1677
1678
1679
1680
1681          *write_RRA_row(
1682     rrd_t *rrd,
1683     unsigned long rra_idx,
1684     unsigned long *rra_current,
1685     unsigned short CDP_scratch_idx,
1686 #ifndef DEBUG
1687     int UNUSED(in_file),
1688 #else
1689     int in_file,
1690 #endif
1691     info_t *pcdp_summary,
1692     time_t *rra_time,
1693     void *rrd_mmaped_file)
1694 #else
1695 info_t
1696
1697
1698
1699
1700
1701
1702
1703
1704          *write_RRA_row(
1705     rrd_t *rrd,
1706     unsigned long rra_idx,
1707     unsigned long *rra_current,
1708     unsigned short CDP_scratch_idx,
1709     int in_file,
1710     info_t *pcdp_summary,
1711     time_t *rra_time)
1712 #endif
1713 {
1714     unsigned long ds_idx, cdp_idx;
1715     infoval   iv;
1716
1717     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1718         /* compute the cdp index */
1719         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1720 #ifdef DEBUG
1721         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1722                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1723                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1724 #endif
1725         if (pcdp_summary != NULL) {
1726             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1727             /* append info to the return hash */
1728             pcdp_summary = info_push(pcdp_summary,
1729                                      sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1730                                                    *rra_time,
1731                                                    rrd->rra_def[rra_idx].
1732                                                    cf_nam,
1733                                                    rrd->rra_def[rra_idx].
1734                                                    pdp_cnt,
1735                                                    rrd->ds_def[ds_idx].
1736                                                    ds_nam), RD_I_VAL, iv);
1737         }
1738 #ifdef HAVE_MMAP
1739         memcpy((char *) rrd_mmaped_file + *rra_current,
1740                &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1741                sizeof(rrd_value_t));
1742 #else
1743         if (rrd_write
1744             (rrd_file,
1745              &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1746              sizeof(rrd_value_t) * 1) != sizeof(rrd_value_t) * 1) {
1747             rrd_set_error("writing rrd");
1748             return 0;
1749         }
1750 #endif
1751         *rra_current += sizeof(rrd_value_t);
1752     }
1753     return (pcdp_summary);
1754 }