Fix for HoltWinters phase-shift bug described below.
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.99907080300  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
16
17 #include <locale.h>
18
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
21
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
24
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
28  * replacement.
29  */
30 #include <sys/timeb.h>
31
32 #ifndef __MINGW32__
33 struct timeval {
34     time_t    tv_sec;   /* seconds */
35     long      tv_usec;  /* microseconds */
36 };
37 #endif
38
39 struct __timezone {
40     int       tz_minuteswest;   /* minutes W of Greenwich */
41     int       tz_dsttime;   /* type of dst correction */
42 };
43
44 static int gettimeofday(
45     struct timeval *t,
46     struct __timezone *tz)
47 {
48
49     struct _timeb current_time;
50
51     _ftime(&current_time);
52
53     t->tv_sec = current_time.time;
54     t->tv_usec = current_time.millitm * 1000;
55
56     return 0;
57 }
58
59 #endif
60
61 /* FUNCTION PROTOTYPES */
62
63 int       rrd_update_r(
64     const char *filename,
65     const char *tmplt,
66     int argc,
67     const char **argv);
68 int       _rrd_update(
69     const char *filename,
70     const char *tmplt,
71     int argc,
72     const char **argv,
73     info_t *);
74
75 static int allocate_data_structures(
76     rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp, const char *tmplt, 
77     long **tmpl_idx, unsigned long *tmpl_cnt, unsigned long **rra_step_cnt, 
78     unsigned long **skip_update, rrd_value_t **pdp_new);
79
80 static int parse_template(rrd_t *rrd, const char *tmplt,
81     unsigned long *tmpl_cnt, long *tmpl_idx);
82
83 static int process_arg(
84     char          *step_start, 
85     rrd_t         *rrd, 
86     rrd_file_t    *rrd_file, 
87     unsigned long  rra_begin,
88     unsigned long *rra_current,
89     time_t        *current_time,
90     unsigned long *current_time_usec,
91     rrd_value_t   *pdp_temp,
92     rrd_value_t   *pdp_new,
93     unsigned long *rra_step_cnt,
94     char         **updvals,
95     long          *tmpl_idx,
96     unsigned long  tmpl_cnt,
97     info_t       **pcdp_summary,
98     int            version,
99     unsigned long *skip_update,
100     int           *schedule_smooth);
101
102 static int parse_ds(rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
103     unsigned long tmpl_cnt, time_t *current_time, unsigned long *current_time_usec, 
104     int version);
105
106 static int get_time_from_reading(rrd_t *rrd, char timesyntax, char **updvals, 
107     time_t *current_time, unsigned long *current_time_usec, int version);
108
109 static int update_pdp_prep(rrd_t *rrd, char **updvals, 
110     rrd_value_t *pdp_new, double interval);
111
112 static int calculate_elapsed_steps(rrd_t *rrd, 
113     unsigned long current_time, unsigned long current_time_usec,
114     double interval, double *pre_int, double *post_int, 
115     unsigned long *proc_pdp_cnt);
116
117 static void simple_update(rrd_t *rrd, double interval, rrd_value_t *pdp_new);
118
119 static int process_all_pdp_st(rrd_t *rrd, double interval, 
120     double pre_int, double post_int, unsigned long elapsed_pdp_st, 
121     rrd_value_t *pdp_new, rrd_value_t *pdp_temp);
122
123 static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval, 
124     double pre_int, double post_int, long diff_pdp_st, rrd_value_t *pdp_new, 
125     rrd_value_t *pdp_temp);
126
127 static int update_all_cdp_prep(
128     rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin, 
129     rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
130     rrd_value_t **last_seasonal_coef, rrd_value_t **seasonal_coef,
131     rrd_value_t *pdp_temp, unsigned long *rra_current, 
132     unsigned long *skip_update, int *schedule_smooth);
133
134 static int do_schedule_smooth(rrd_t *rrd, unsigned long rra_idx, 
135     unsigned long elapsed_pdp_st);
136
137 static int update_cdp_prep(rrd_t *rrd, unsigned long elapsed_pdp_st, 
138     unsigned long start_pdp_offset, unsigned long *rra_step_cnt, 
139     int rra_idx, rrd_value_t *pdp_temp, rrd_value_t *last_seasonal_coef, 
140     rrd_value_t *seasonal_coef, int current_cf);
141
142 static void update_cdp(unival *scratch, int current_cf, 
143     rrd_value_t pdp_temp_val, unsigned long rra_step_cnt, 
144     unsigned long elapsed_pdp_st, unsigned long start_pdp_offset, 
145     unsigned long pdp_cnt, rrd_value_t xff, int i, int ii);
146
147 static void initialize_cdp_val(unival *scratch, int current_cf,
148     rrd_value_t pdp_temp_val, unsigned long elapsed_pdp_st, 
149     unsigned long start_pdp_offset, unsigned long pdp_cnt);
150
151 static void reset_cdp(rrd_t *rrd, unsigned long elapsed_pdp_st, 
152     rrd_value_t *pdp_temp, rrd_value_t *last_seasonal_coef, 
153     rrd_value_t *seasonal_coef, 
154     int rra_idx, int ds_idx, int cdp_idx, enum cf_en current_cf);
155
156 static rrd_value_t initialize_average_carry_over(rrd_value_t pdp_temp_val,
157     unsigned long elapsed_pdp_st, unsigned long start_pdp_offset, 
158     unsigned long pdp_cnt);
159
160 static rrd_value_t calculate_cdp_val(
161     rrd_value_t cdp_val, rrd_value_t pdp_temp_val,
162     unsigned long elapsed_pdp_st, int current_cf, int i, int ii);
163
164 static int update_aberrant_cdps(rrd_t *rrd, rrd_file_t *rrd_file, 
165     unsigned long rra_begin, unsigned long *rra_current, 
166     unsigned long elapsed_pdp_st, rrd_value_t *pdp_temp, rrd_value_t **seasonal_coef);
167
168 static int write_to_rras(rrd_t *rrd, rrd_file_t *rrd_file, 
169     unsigned long *rra_step_cnt, unsigned long rra_begin, 
170     unsigned long *rra_current, time_t current_time, 
171     unsigned long *skip_update, info_t **pcdp_summary);
172
173 static int write_RRA_row(rrd_file_t *rrd_file, rrd_t *rrd, unsigned long rra_idx,
174     unsigned long *rra_current, unsigned short CDP_scratch_idx, info_t **pcdp_summary,
175     time_t rra_time);
176
177 static int smooth_all_rras(rrd_t *rrd, rrd_file_t *rrd_file, 
178     unsigned long rra_begin);
179
180 #ifndef HAVE_MMAP
181 static int write_changes_to_disk(rrd_t *rrd, rrd_file_t *rrd_file, 
182     int version);
183 #endif
184
185 /*
186  * normalize time as returned by gettimeofday. usec part must
187  * be always >= 0
188  */
189 static inline void normalize_time(
190     struct timeval *t)
191 {
192     if (t->tv_usec < 0) {
193         t->tv_sec--;
194         t->tv_usec += 1e6L;
195     }
196 }
197
198 /*
199  * Sets current_time and current_time_usec based on the current time.
200  * current_time_usec is set to 0 if the version number is 1 or 2.
201  */
202 static inline void initialize_time(
203     time_t *current_time, unsigned long *current_time_usec,
204     int version) 
205 {
206     struct timeval tmp_time;    /* used for time conversion */
207
208     gettimeofday(&tmp_time, 0);
209     normalize_time(&tmp_time);
210     *current_time = tmp_time.tv_sec;
211     if (version >= 3) {
212         *current_time_usec = tmp_time.tv_usec;
213     } else {
214         *current_time_usec = 0;
215     }
216 }
217
218 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
219
220 info_t   *rrd_update_v(
221     int argc,
222     char **argv)
223 {
224     char     *tmplt = NULL;
225     info_t   *result = NULL;
226     infoval   rc;
227     struct option long_options[] = {
228         {"template", required_argument, 0, 't'},
229         {0, 0, 0, 0}
230     };
231
232     rc.u_int = -1;
233     optind = 0;
234     opterr = 0;         /* initialize getopt */
235
236     while (1) {
237         int       option_index = 0;
238         int       opt;
239
240         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
241
242         if (opt == EOF)
243             break;
244
245         switch (opt) {
246         case 't':
247             tmplt = optarg;
248             break;
249
250         case '?':
251             rrd_set_error("unknown option '%s'", argv[optind - 1]);
252             goto end_tag;
253         }
254     }
255
256     /* need at least 2 arguments: filename, data. */
257     if (argc - optind < 2) {
258         rrd_set_error("Not enough arguments");
259         goto end_tag;
260     }
261     rc.u_int = 0;
262     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
263     rc.u_int = _rrd_update(argv[optind], tmplt,
264                            argc - optind - 1,
265                            (const char **) (argv + optind + 1), result);
266     result->value.u_int = rc.u_int;
267   end_tag:
268     return result;
269 }
270
271 int rrd_update(
272     int argc,
273     char **argv)
274 {
275     struct option long_options[] = {
276         {"template", required_argument, 0, 't'},
277         {0, 0, 0, 0}
278     };
279     int       option_index = 0;
280     int       opt;
281     char     *tmplt = NULL;
282     int       rc;
283
284     optind = 0;
285     opterr = 0;         /* initialize getopt */
286
287     while (1) {
288         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
289
290         if (opt == EOF)
291             break;
292
293         switch (opt) {
294         case 't':
295             tmplt = strdup(optarg);
296             break;
297
298         case '?':
299             rrd_set_error("unknown option '%s'", argv[optind - 1]);
300             return (-1);
301         }
302     }
303
304     /* need at least 2 arguments: filename, data. */
305     if (argc - optind < 2) {
306         rrd_set_error("Not enough arguments");
307
308         return -1;
309     }
310
311     rc = rrd_update_r(argv[optind], tmplt,
312                       argc - optind - 1, (const char **) (argv + optind + 1));
313     free(tmplt);
314     return rc;
315 }
316
317 int rrd_update_r(
318     const char *filename,
319     const char *tmplt,
320     int argc,
321     const char **argv)
322 {
323     return _rrd_update(filename, tmplt, argc, argv, NULL);
324 }
325
326 int _rrd_update(
327     const char *filename,
328     const char *tmplt,
329     int argc,
330     const char **argv,
331     info_t *pcdp_summary)
332 {
333
334     int           arg_i = 2;
335
336     unsigned long rra_begin;    /* byte pointer to the rra
337                                  * area in the rrd file.  this
338                                  * pointer never changes value */
339     unsigned long rra_current;  /* byte pointer to the current write
340                                  * spot in the rrd file. */
341     rrd_value_t   *pdp_new;     /* prepare the incoming data to be added 
342                                  * to the existing entry */
343     rrd_value_t   *pdp_temp;    /* prepare the pdp values to be added 
344                                  * to the cdp values */
345
346     long          *tmpl_idx;    /* index representing the settings
347                                  * transported by the tmplt index */
348     unsigned long tmpl_cnt = 2; /* time and data */
349     rrd_t         rrd;
350     time_t        current_time = 0;
351     unsigned long current_time_usec = 0;    /* microseconds part of current time */
352     char        **updvals;
353     int           schedule_smooth = 0;
354
355     /* number of elapsed PDP steps since last update */
356     unsigned long *rra_step_cnt = NULL;
357
358     int            version;     /* rrd version */
359     rrd_file_t    *rrd_file;
360     char          *arg_copy;    /* for processing the argv */
361     unsigned long *skip_update; /* RRAs to advance but not write */
362
363     /* need at least 1 arguments: data. */
364     if (argc < 1) {
365         rrd_set_error("Not enough arguments");
366         goto err_out;
367     }
368
369     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
370         goto err_free;
371     }
372     /* We are now at the beginning of the rra's */
373     rra_current = rra_begin = rrd_file->header_len;
374
375     version = atoi(rrd.stat_head->version);
376
377     initialize_time(&current_time, &current_time_usec, version);
378
379     /* get exclusive lock to whole file.
380      * lock gets removed when we close the file.
381      */
382     if (LockRRD(rrd_file->fd) != 0) {
383         rrd_set_error("could not lock RRD");
384         goto err_close;
385     }
386
387     if (allocate_data_structures(&rrd, &updvals, 
388                     &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
389                     &rra_step_cnt, &skip_update, &pdp_new) == -1) {
390         goto err_close;
391     }
392
393     /* loop through the arguments. */
394     for (arg_i = 0; arg_i < argc; arg_i++) {
395         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
396             rrd_set_error("failed duplication argv entry");
397             break;
398         }
399         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current, 
400                         &current_time, &current_time_usec, pdp_temp, pdp_new,
401                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt, &pcdp_summary, 
402                         version, skip_update, &schedule_smooth) == -1) {
403             free(arg_copy);
404             break;
405         }
406         free(arg_copy);
407     }
408
409     free(rra_step_cnt);
410
411     /* if we got here and if there is an error and if the file has not been
412      * written to, then close things up and return. */
413     if (rrd_test_error()) {
414         goto err_free_structures;
415     }
416
417 #ifndef HAVE_MMAP
418     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
419         goto err_free_structures;
420     }
421 #endif
422
423     /* calling the smoothing code here guarantees at most one smoothing
424      * operation per rrd_update call. Unfortunately, it is possible with bulk
425      * updates, or a long-delayed update for smoothing to occur off-schedule.
426      * This really isn't critical except during the burn-in cycles. */
427     if (schedule_smooth) {
428         smooth_all_rras(&rrd, rrd_file, rra_begin);
429     }
430
431 /*    rrd_dontneed(rrd_file,&rrd); */
432     rrd_free(&rrd);
433     rrd_close(rrd_file);
434
435     free(pdp_new);
436     free(tmpl_idx);
437     free(pdp_temp);
438     free(skip_update);
439     free(updvals);
440     return 0;
441
442   err_free_structures:
443     free(pdp_new);
444     free(tmpl_idx);
445     free(pdp_temp);
446     free(skip_update);
447     free(updvals);
448   err_close:
449     rrd_close(rrd_file);
450   err_free:
451     rrd_free(&rrd);
452   err_out:
453     return -1;
454 }
455
456 /*
457  * get exclusive lock to whole file.
458  * lock gets removed when we close the file
459  *
460  * returns 0 on success
461  */
462 int LockRRD(
463     int in_file)
464 {
465     int       rcstat;
466
467     {
468 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
469         struct _stat st;
470
471         if (_fstat(in_file, &st) == 0) {
472             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
473         } else {
474             rcstat = -1;
475         }
476 #else
477         struct flock lock;
478
479         lock.l_type = F_WRLCK;  /* exclusive write lock */
480         lock.l_len = 0; /* whole file */
481         lock.l_start = 0;   /* start of file */
482         lock.l_whence = SEEK_SET;   /* end of file */
483
484         rcstat = fcntl(in_file, F_SETLK, &lock);
485 #endif
486     }
487
488     return (rcstat);
489 }
490
491 /*
492  * Allocate some important arrays used, and initialize the template.
493  *
494  * When it returns, either all of the structures are allocated
495  * or none of them are.
496  *
497  * Returns 0 on success, -1 on error.
498  */
499 static int allocate_data_structures(
500     rrd_t         *rrd, 
501     char        ***updvals, 
502     rrd_value_t  **pdp_temp, 
503     const char    *tmplt, 
504     long         **tmpl_idx, 
505     unsigned long *tmpl_cnt, 
506     unsigned long **rra_step_cnt, 
507     unsigned long **skip_update,
508     rrd_value_t  **pdp_new) 
509 {
510     unsigned i, ii;
511     if ((*updvals = (char **)malloc(sizeof(char *) 
512                            * (rrd->stat_head->ds_cnt + 1))) == NULL) {
513         rrd_set_error("allocating updvals pointer array");
514         return -1;
515     }
516     if ((*pdp_temp = (rrd_value_t *)malloc(sizeof(rrd_value_t)
517                            * rrd->stat_head->ds_cnt)) == NULL) {
518         rrd_set_error("allocating pdp_temp ...");
519         goto err_free_updvals;
520     }
521     if ((*skip_update = (unsigned long *)malloc(sizeof(unsigned long)
522                            * rrd->stat_head->rra_cnt)) == NULL) {
523         rrd_set_error("allocating skip_update...");
524         goto err_free_pdp_temp;
525     }
526     if ((*tmpl_idx = (long *)malloc(sizeof(unsigned long)
527                            * (rrd->stat_head->ds_cnt + 1))) == NULL) {
528         rrd_set_error("allocating tmpl_idx ...");
529         goto err_free_skip_update;
530     }
531     if ((*rra_step_cnt = (unsigned long *)malloc(sizeof(unsigned long) 
532                            * (rrd->stat_head->rra_cnt))) == NULL) {
533         rrd_set_error("allocating rra_step_cnt...");
534         goto err_free_tmpl_idx;
535     }
536
537     /* initialize tmplt redirector */
538     /* default config example (assume DS 1 is a CDEF DS)
539        tmpl_idx[0] -> 0; (time)
540        tmpl_idx[1] -> 1; (DS 0)
541        tmpl_idx[2] -> 3; (DS 2)
542        tmpl_idx[3] -> 4; (DS 3) */
543     (*tmpl_idx)[0] = 0;    /* time */
544     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
545         if (dst_conv(rrd->ds_def[i-1].dst) != DST_CDEF)
546             (*tmpl_idx)[ii++] = i;
547     }
548     *tmpl_cnt = ii;
549
550     if (tmplt != NULL) {
551         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
552             goto err_free_rra_step_cnt;
553         }
554     }
555
556     if ((*pdp_new = (rrd_value_t *)malloc(sizeof(rrd_value_t)
557                           * rrd->stat_head->ds_cnt)) == NULL) {
558         rrd_set_error("allocating pdp_new ...");
559         goto err_free_rra_step_cnt;
560     }
561
562     return 0;
563
564 err_free_rra_step_cnt:
565     free(*rra_step_cnt);
566 err_free_tmpl_idx:
567     free(*tmpl_idx);
568 err_free_skip_update:
569     free(*skip_update);
570 err_free_pdp_temp:
571     free(*pdp_temp);
572 err_free_updvals:
573     free(*updvals);
574     return -1;
575 }
576
577 /*
578  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
579  *
580  * Returns 0 on success.
581  */
582 static int parse_template(
583     rrd_t *rrd, const char *tmplt, 
584     unsigned long *tmpl_cnt, long *tmpl_idx)
585 {
586     char          *dsname, *tmplt_copy;
587     unsigned int   tmpl_len, i;
588
589     *tmpl_cnt = 1;   /* the first entry is the time */
590
591     /* we should work on a writeable copy here */
592     if ((tmplt_copy = strdup(tmplt)) == NULL) {
593         rrd_set_error("error copying tmplt '%s'", tmplt);
594         return -1;
595     }
596
597     dsname = tmplt_copy;
598     tmpl_len = strlen(tmplt_copy);
599     for (i = 0; i <= tmpl_len; i++) {
600         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
601             tmplt_copy[i] = '\0';
602             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
603                 rrd_set_error("tmplt contains more DS definitions than RRD");
604                 free(tmplt_copy);
605                 return -1;
606             }
607             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname)+1) == 0) {
608                 rrd_set_error("unknown DS name '%s'", dsname);
609                 free(tmplt_copy);
610                 return -1;
611             } 
612             /* go to the next entry on the tmplt_copy */
613             if (i < tmpl_len)
614                 dsname = &tmplt_copy[i+1];
615         }
616     }
617     free(tmplt_copy);
618     return 0;
619 }
620
621 /*
622  * Parse an update string, updates the primary data points (PDPs)
623  * and consolidated data points (CDPs), and writes changes to the RRAs.
624  *
625  * Returns 0 on success, -1 on error.
626  */
627 static int process_arg(
628     char           *step_start, 
629     rrd_t          *rrd, 
630     rrd_file_t     *rrd_file, 
631     unsigned long  rra_begin,
632     unsigned long *rra_current,
633     time_t        *current_time,
634     unsigned long *current_time_usec,
635     rrd_value_t   *pdp_temp,
636     rrd_value_t   *pdp_new,
637     unsigned long *rra_step_cnt,
638     char         **updvals,
639     long          *tmpl_idx,
640     unsigned long  tmpl_cnt,
641     info_t       **pcdp_summary,
642     int            version,
643     unsigned long *skip_update,
644     int           *schedule_smooth)
645 {
646     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
647
648     /* a vector of future Holt-Winters seasonal coefs */
649     unsigned long elapsed_pdp_st;
650
651     double    interval, pre_int, post_int;  /* interval between this and
652                                              * the last run */
653     unsigned long proc_pdp_cnt;
654     unsigned long rra_start;
655
656     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt, 
657             current_time, current_time_usec, version) == -1) {
658         return -1;
659     }
660     /* seek to the beginning of the rra's */
661     if (*rra_current != rra_begin) {
662 #ifndef HAVE_MMAP
663         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
664             rrd_set_error("seek error in rrd");
665             return -1;
666         }
667 #endif
668         *rra_current = rra_begin;
669     }
670     rra_start = rra_begin;
671
672     interval = (double) (*current_time - rrd->live_head->last_up)
673             + (double) ((long) *current_time_usec -
674                             (long) rrd->live_head->last_up_usec) / 1e6f;
675
676     /* process the data sources and update the pdp_prep 
677      * area accordingly */
678     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
679         return -1;
680     }
681
682     elapsed_pdp_st = calculate_elapsed_steps(rrd, 
683                     *current_time, *current_time_usec, 
684                     interval, &pre_int, &post_int,
685                     &proc_pdp_cnt);
686
687     /* has a pdp_st moment occurred since the last run ? */
688     if (elapsed_pdp_st == 0) {
689         /* no we have not passed a pdp_st moment. therefore update is simple */
690         simple_update(rrd, interval, pdp_new);
691     } else {
692         /* an pdp_st has occurred. */
693         if (process_all_pdp_st(rrd, interval, 
694                                pre_int, post_int,
695                                elapsed_pdp_st, 
696                                pdp_new, pdp_temp) == -1) 
697         {
698             return -1;
699         }
700         if (update_all_cdp_prep(rrd, rra_step_cnt, 
701                                 rra_begin, rrd_file,
702                                 elapsed_pdp_st,
703                                 proc_pdp_cnt,
704                                 &last_seasonal_coef,
705                                 &seasonal_coef,
706                                 pdp_temp, rra_current,
707                                 skip_update, schedule_smooth) == -1)
708         {
709             goto err_free_coefficients;
710         }
711         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current, 
712                 elapsed_pdp_st, pdp_temp, &seasonal_coef) == -1)
713         {
714             goto err_free_coefficients;
715         }
716         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin, 
717                 rra_current, *current_time, skip_update, pcdp_summary) == -1)
718         {
719             goto err_free_coefficients;
720         }
721     }               /* endif a pdp_st has occurred */
722     rrd->live_head->last_up = *current_time;
723     rrd->live_head->last_up_usec = *current_time_usec;
724
725     free(seasonal_coef);
726     free(last_seasonal_coef);
727     return 0;
728
729 err_free_coefficients:
730     free(seasonal_coef);
731     free(last_seasonal_coef);
732     return -1;
733 }
734
735 /*
736  * Parse a DS string (time + colon-separated values), storing the
737  * results in current_time, current_time_usec, and updvals.
738  *
739  * Returns 0 on success, -1 on error.
740  */
741 static int parse_ds(
742     rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
743     unsigned long tmpl_cnt, time_t *current_time,
744     unsigned long *current_time_usec, int version)
745 {
746     char *p;
747     unsigned long i;
748     char timesyntax;
749
750     updvals[0] = input;
751     /* initialize all ds input to unknown except the first one
752        which has always got to be set */
753     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
754         updvals[i] = "U";
755
756     /* separate all ds elements; first must be examined separately
757        due to alternate time syntax */
758     if ((p = strchr(input, '@')) != NULL) {
759         timesyntax = '@';
760     } else if ((p = strchr(input, ':')) != NULL) {
761         timesyntax = ':';
762     } else {
763         rrd_set_error("expected timestamp not found in data source from %s",
764                  input);
765         return -1;
766     }
767     *p = '\0';
768     i = 1;
769     updvals[tmpl_idx[i++]] = p+1;
770     while (*(++p)) {
771         if (*p == ':') {
772             *p = '\0';
773             if (i < tmpl_cnt) {
774                 updvals[tmpl_idx[i++]] = p+1;
775             }
776         }
777     }
778
779     if (i != tmpl_cnt) {
780         rrd_set_error("expected %lu data source readings (got %lu) from %s",
781                  tmpl_cnt - 1, i, input);
782         return -1;
783     }
784
785     if (get_time_from_reading(rrd, timesyntax, updvals, 
786                             current_time, current_time_usec, 
787                             version) == -1) {
788         return -1;
789     }
790     return 0;
791 }
792
793 /*
794  * Parse the time in a DS string, store it in current_time and 
795  * current_time_usec and verify that it's later than the last
796  * update for this DS.
797  *
798  * Returns 0 on success, -1 on error.
799  */
800 static int get_time_from_reading(
801     rrd_t *rrd, char timesyntax, char **updvals, 
802     time_t *current_time, unsigned long *current_time_usec,
803     int version)
804 {
805     double    tmp;
806     char     *parsetime_error = NULL;
807     char     *old_locale;
808     struct rrd_time_value ds_tv;
809     struct timeval tmp_time;    /* used for time conversion */
810
811     /* get the time from the reading ... handle N */
812     if (timesyntax == '@') { /* at-style */
813         if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
814             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
815             return -1;
816         }
817         if (ds_tv.type == RELATIVE_TO_END_TIME ||
818                         ds_tv.type == RELATIVE_TO_START_TIME) {
819             rrd_set_error("specifying time relative to the 'start' "
820                             "or 'end' makes no sense here: %s", updvals[0]);
821             return -1;
822         }
823         *current_time = mktime(&ds_tv.tm) + ds_tv.offset;
824         *current_time_usec = 0;  /* FIXME: how to handle usecs here ? */
825     } else if (strcmp(updvals[0], "N") == 0) {
826         gettimeofday(&tmp_time, 0);
827         normalize_time(&tmp_time);
828         *current_time = tmp_time.tv_sec;
829         *current_time_usec = tmp_time.tv_usec;
830     } else {
831         old_locale = setlocale(LC_NUMERIC, "C");
832         tmp = strtod(updvals[0], 0);
833         setlocale(LC_NUMERIC, old_locale);
834         *current_time = floor(tmp);
835         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
836     }
837     /* dont do any correction for old version RRDs */
838     if (version < 3)
839         *current_time_usec = 0;
840
841     if (*current_time < rrd->live_head->last_up ||
842                     (*current_time == rrd->live_head->last_up &&
843                      (long) *current_time_usec <=
844                      (long) rrd->live_head->last_up_usec)) {
845         rrd_set_error("illegal attempt to update using time %ld when "
846                           "last update time is %ld (minimum one second step)",
847                           *current_time, rrd->live_head->last_up);
848         return -1;
849     }
850     return 0;
851 }
852
853 /*
854  * Update pdp_new by interpreting the updvals according to the DS type
855  * (COUNTER, GAUGE, etc.).
856  *
857  * Returns 0 on success, -1 on error.
858  */
859 static int update_pdp_prep(
860     rrd_t *rrd, char **updvals, 
861     rrd_value_t *pdp_new, double interval) 
862 {
863     unsigned long ds_idx; 
864     int ii;
865     char     *endptr;   /* used in the conversion */
866     double rate;
867     char     *old_locale;
868     enum dst_en dst_idx;
869
870     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
871         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
872
873         /* make sure we do not build diffs with old last_ds values */
874         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
875             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
876             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
877         }
878
879         /* NOTE: DST_CDEF should never enter this if block, because
880          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
881          * accidently specified a value for the DST_CDEF. To handle this case,
882          * an extra check is required. */
883
884         if ((updvals[ds_idx+1][0] != 'U') &&
885                         (dst_idx != DST_CDEF) &&
886                         rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
887             rate = DNAN;
888
889             /* pdp_new contains rate * time ... eg the bytes transferred during
890              * the interval. Doing it this way saves a lot of math operations
891              */
892             switch (dst_idx) {
893                     case DST_COUNTER:
894                     case DST_DERIVE:
895                             for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
896                                 if ((updvals[ds_idx + 1][ii] < '0' || updvals[ds_idx + 1][ii] > '9') 
897                                                 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
898                                     rrd_set_error("not a simple integer: '%s'", updvals[ds_idx + 1]);
899                                     return -1;
900                                 }
901                             }
902                             if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
903                                 pdp_new[ds_idx] = rrd_diff(updvals[ds_idx+1], rrd->pdp_prep[ds_idx].last_ds);
904                                 if (dst_idx == DST_COUNTER) {
905                                     /* simple overflow catcher. This will fail
906                                      * terribly for non 32 or 64 bit counters
907                                      * ... are there any others in SNMP land?
908                                      */
909                                     if (pdp_new[ds_idx] < (double) 0.0)
910                                         pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
911                                     if (pdp_new[ds_idx] < (double) 0.0)
912                                         pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
913                                 }
914                                 rate = pdp_new[ds_idx] / interval;
915                             } else {
916                                 pdp_new[ds_idx] = DNAN;
917                             }
918                             break;
919                     case DST_ABSOLUTE:
920                             old_locale = setlocale(LC_NUMERIC, "C");
921                             errno = 0;
922                             pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
923                             setlocale(LC_NUMERIC, old_locale);
924                             if (errno > 0) {
925                                 rrd_set_error("converting '%s' to float: %s",
926                                                 updvals[ds_idx + 1], rrd_strerror(errno));
927                                 return -1;
928                             };
929                             if (endptr[0] != '\0') {
930                                 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",
931                                          updvals[ds_idx + 1], endptr);
932                                 return -1;
933                             }
934                             rate = pdp_new[ds_idx] / interval;
935                             break;
936                     case DST_GAUGE:
937                             errno = 0;
938                             old_locale = setlocale(LC_NUMERIC, "C");
939                             pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr) * interval;
940                             setlocale(LC_NUMERIC, old_locale);
941                             if (errno) {
942                                 rrd_set_error("converting '%s' to float: %s",
943                                                 updvals[ds_idx + 1], rrd_strerror(errno));
944                                 return -1;
945                             };
946                             if (endptr[0] != '\0') {
947                                 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",
948                                          updvals[ds_idx + 1], endptr);
949                                 return -1;
950                             }
951                             rate = pdp_new[ds_idx] / interval;
952                             break;
953                     default:
954                             rrd_set_error("rrd contains unknown DS type : '%s'",
955                                             rrd->ds_def[ds_idx].dst);
956                             return -1;
957             }
958             /* break out of this for loop if the error string is set */
959             if (rrd_test_error()) {
960                 return -1;
961             }
962             /* make sure pdp_temp is neither too large or too small
963              * if any of these occur it becomes unknown ...
964              * sorry folks ... */
965             if (!isnan(rate) &&
966                             ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
967                               rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
968                              (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
969                               rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
970                 pdp_new[ds_idx] = DNAN;
971             }
972         } else {
973             /* no news is news all the same */
974             pdp_new[ds_idx] = DNAN;
975         }
976
977
978         /* make a copy of the command line argument for the next run */
979 #ifdef DEBUG
980         fprintf(stderr, "prep ds[%lu]\t"
981                         "last_arg '%s'\t"
982                         "this_arg '%s'\t"
983                         "pdp_new %10.2f\n",
984                         ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx+1], pdp_new[ds_idx]);
985 #endif
986         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx+1], LAST_DS_LEN - 1);
987         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN-1] = '\0';
988     }
989     return 0;
990 }
991
992 /*
993  * How many PDP steps have elapsed since the last update? Returns the answer,
994  * and stores the time between the last update and the last PDP in pre_time,
995  * and the time between the last PDP and the current time in post_int.
996  */
997 static int calculate_elapsed_steps(
998     rrd_t *rrd, 
999     unsigned long current_time, 
1000     unsigned long current_time_usec,
1001     double interval,
1002     double *pre_int,
1003     double *post_int,
1004     unsigned long *proc_pdp_cnt) 
1005 {
1006     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1007     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1008                                  * time */
1009     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1010                                  * when it was last updated */
1011     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1012
1013     /* when was the current pdp started */
1014     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1015     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1016
1017     /* when did the last pdp_st occur */
1018     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1019     occu_pdp_st = current_time - occu_pdp_age;
1020
1021     if (occu_pdp_st > proc_pdp_st) {
1022         /* OK we passed the pdp_st moment */
1023         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;  /* how much of the input data
1024                                                                  * occurred before the latest
1025                                                                  * pdp_st moment*/
1026         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f;  /* adjust usecs */
1027         *post_int = occu_pdp_age;    /* how much after it */
1028         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1029     } else {
1030         *pre_int = interval;
1031         *post_int = 0;
1032     }
1033
1034     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1035
1036 #ifdef DEBUG
1037     printf("proc_pdp_age %lu\t"
1038                     "proc_pdp_st %lu\t"
1039                     "occu_pfp_age %lu\t"
1040                     "occu_pdp_st %lu\t"
1041                     "int %lf\t"
1042                     "pre_int %lf\t"
1043                     "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1044                     occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1045 #endif
1046
1047     /* compute the number of elapsed pdp_st moments */
1048     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1049 }
1050
1051 /*
1052  * Increment the PDP values by the values in pdp_new, or else initialize them.
1053  */
1054 static void simple_update(
1055     rrd_t *rrd, double interval, rrd_value_t *pdp_new) 
1056 {
1057     int i;
1058     for (i = 0; i < (signed)rrd->stat_head->ds_cnt; i++) {
1059         if (isnan(pdp_new[i])) {
1060             /* this is not really accurate if we use subsecond data arrival time
1061                should have thought of it when going subsecond resolution ...
1062                sorry next format change we will have it! */
1063             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
1064         } else {
1065             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1066                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1067             } else {
1068                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1069             }
1070         }
1071 #ifdef DEBUG
1072         fprintf(stderr,
1073                         "NO PDP  ds[%i]\t"
1074                         "value %10.2f\t"
1075                         "unkn_sec %5lu\n",
1076                         i,
1077                         rrd->pdp_prep[i].scratch[PDP_val].u_val,
1078                         rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1079 #endif
1080     }
1081 }
1082
1083 /*
1084  * Call process_pdp_st for each DS.
1085  *
1086  * Returns 0 on success, -1 on error.
1087  */
1088 static int process_all_pdp_st(
1089     rrd_t *rrd, double interval, double pre_int, double post_int, 
1090     unsigned long elapsed_pdp_st, rrd_value_t *pdp_new, rrd_value_t *pdp_temp) 
1091 {
1092     unsigned long ds_idx;
1093     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1094        rate*seconds which occurred up to the last run.
1095        pdp_new[] contains rate*seconds from the latest run.
1096        pdp_temp[] will contain the rate for cdp */
1097
1098     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1099         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1100                                 elapsed_pdp_st * rrd->stat_head->pdp_step, 
1101                                 pdp_new, pdp_temp) == -1) {
1102             return -1;
1103         }
1104 #ifdef DEBUG
1105         fprintf(stderr, "PDP UPD ds[%lu]\t"
1106                         "pdp_temp %10.2f\t"
1107                         "new_prep %10.2f\t"
1108                         "new_unkn_sec %5lu\n",
1109                         ds_idx, pdp_temp[ds_idx],
1110                         rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1111                         rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1112 #endif
1113     }
1114     return 0;
1115 }
1116
1117 /*
1118  * Process an update that occurs after one of the PDP moments.
1119  * Increments the PDP value, sets NAN if time greater than the
1120  * heartbeats have elapsed, processes CDEFs.
1121  *
1122  * Returns 0 on success, -1 on error.
1123  */
1124 static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval, 
1125     double pre_int, double post_int, long diff_pdp_st, 
1126     rrd_value_t *pdp_new, rrd_value_t *pdp_temp) 
1127 {
1128     int i;
1129     /* update pdp_prep to the current pdp_st. */
1130     double    pre_unknown = 0.0;
1131     unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1132     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1133
1134     rpnstack_t     rpnstack;    /* used for COMPUTE DS */
1135     rpnstack_init(&rpnstack);
1136
1137
1138     if (isnan(pdp_new[ds_idx])) {
1139         /* a final bit of unknown to be added bevore calculation
1140            we use a temporary variable for this so that we
1141            don't have to turn integer lines before using the value */
1142         pre_unknown = pre_int;
1143     } else {
1144         if (isnan(scratch[PDP_val].u_val)) {
1145             scratch[PDP_val].u_val = 0;
1146         } 
1147         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1148     }
1149
1150     /* if too much of the pdp_prep is unknown we dump it */
1151     /* if the interval is larger thatn mrhb we get NAN */
1152     if ((interval > mrhb) ||
1153                     (diff_pdp_st <= (signed)scratch[PDP_unkn_sec_cnt].u_cnt)) {
1154         pdp_temp[ds_idx] = DNAN;
1155     } else {
1156         pdp_temp[ds_idx] = scratch[PDP_val].u_val / 
1157                 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) - pre_unknown);
1158     }
1159
1160     /* process CDEF data sources; remember each CDEF DS can
1161      * only reference other DS with a lower index number */
1162     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1163         rpnp_t   *rpnp;
1164
1165         rpnp = rpn_expand((rpn_cdefds_t *)&(rrd->ds_def[ds_idx].par[DS_cdef]));
1166         /* substitute data values for OP_VARIABLE nodes */
1167         for (i = 0; rpnp[i].op != OP_END; i++) {
1168             if (rpnp[i].op == OP_VARIABLE) {
1169                 rpnp[i].op = OP_NUMBER;
1170                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1171             }
1172         }
1173         /* run the rpn calculator */
1174         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1175             free(rpnp);
1176             rpnstack_free(&rpnstack);
1177             return -1;
1178         }
1179     }
1180
1181     /* make pdp_prep ready for the next run */
1182     if (isnan(pdp_new[ds_idx])) {
1183         /* this is not realy accurate if we use subsecond data arival time
1184            should have thought of it when going subsecond resolution ...
1185            sorry next format change we will have it! */
1186         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1187         scratch[PDP_val].u_val = DNAN;
1188     } else {
1189         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1190         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1191     }
1192     rpnstack_free(&rpnstack);
1193     return 0;
1194 }
1195
1196 /*
1197  * Iterate over all the RRAs for a given DS and:
1198  * 1. Decide whether to schedule a smooth later
1199  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1200  * 3. Update the CDP
1201  *
1202  * Returns 0 on success, -1 on error
1203  */
1204 static int update_all_cdp_prep(
1205     rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin, 
1206     rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
1207     rrd_value_t  **last_seasonal_coef, rrd_value_t  **seasonal_coef,
1208     rrd_value_t *pdp_temp, unsigned long *rra_current, 
1209     unsigned long *skip_update, int *schedule_smooth)
1210 {
1211     unsigned long rra_idx;
1212     /* index into the CDP scratch array */
1213     enum cf_en current_cf;
1214     unsigned long rra_start;
1215     /* number of rows to be updated in an RRA for a data value. */
1216     unsigned long start_pdp_offset;
1217
1218     rra_start = rra_begin;
1219     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1220         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1221         start_pdp_offset = rrd->rra_def[rra_idx].pdp_cnt - proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1222         skip_update[rra_idx] = 0;
1223         if (start_pdp_offset <= elapsed_pdp_st) {
1224             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1225                     rrd->rra_def[rra_idx].pdp_cnt + 1;
1226         } else {
1227             rra_step_cnt[rra_idx] = 0;
1228         }
1229
1230         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1231             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1232              * so that they will be correct for the next observed value; note that for
1233              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1234              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1235             if (rra_step_cnt[rra_idx] > 1) {
1236                 skip_update[rra_idx] = 1;
1237                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1238                                 elapsed_pdp_st, last_seasonal_coef);
1239                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1240                                 elapsed_pdp_st + 1, seasonal_coef);
1241             }
1242             /* periodically run a smoother for seasonal effects */
1243             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1244 #ifdef DEBUG
1245                 fprintf(stderr, "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1246                                 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st, 
1247                                 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt);
1248 #endif
1249                 *schedule_smooth = 1;
1250             }
1251             *rra_current = rrd_tell(rrd_file);
1252         }
1253         if (rrd_test_error())
1254             return -1;
1255
1256         if (update_cdp_prep(rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, 
1257                                 rra_idx, pdp_temp, *last_seasonal_coef, *seasonal_coef, 
1258                                 current_cf) == -1) {
1259             return -1;
1260         }
1261         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1262     }
1263     return 0;
1264 }
1265
1266 /* 
1267  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1268  */
1269 static int do_schedule_smooth(
1270     rrd_t *rrd, unsigned long rra_idx,
1271     unsigned long elapsed_pdp_st)
1272 {
1273     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1274     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1275     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1276     unsigned long seasonal_smooth_idx = rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1277     unsigned long *init_seasonal = &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1278
1279     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1280      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1281      * really an RRA level, not a data source within RRA level parameter, but
1282      * the rra_def is read only for rrd_update (not flushed to disk). */
1283     if (*init_seasonal > BURNIN_CYCLES) {
1284         /* someone has no doubt invented a trick to deal with this wrap around,
1285          * but at least this code is clear. */
1286         if (seasonal_smooth_idx > cur_row) {
1287             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1288              * between PDP and CDP */
1289             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1290         } 
1291         /* can't rely on negative numbers because we are working with
1292          * unsigned values */
1293         return (cur_row + elapsed_pdp_st >= row_cnt
1294                         && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1295     } 
1296     /* mark off one of the burn-in cycles */
1297     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1298 }
1299
1300 /*
1301  * For a given RRA, iterate over the data sources and call the appropriate
1302  * consolidation function.
1303  *
1304  * Returns 0 on success, -1 on error.
1305  */
1306 static int update_cdp_prep(
1307     rrd_t *rrd, 
1308     unsigned long elapsed_pdp_st, 
1309     unsigned long start_pdp_offset, 
1310     unsigned long *rra_step_cnt, 
1311     int rra_idx, 
1312     rrd_value_t *pdp_temp, 
1313     rrd_value_t *last_seasonal_coef, 
1314     rrd_value_t *seasonal_coef, 
1315     int current_cf) 
1316 {
1317     unsigned long ds_idx, cdp_idx;
1318     /* update CDP_PREP areas */
1319     /* loop over data soures within each RRA */
1320     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1321
1322         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1323
1324         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1325             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf, 
1326                             pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1327                             elapsed_pdp_st, start_pdp_offset,
1328                             rrd->rra_def[rra_idx].pdp_cnt,
1329                             rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val, rra_idx, ds_idx);
1330         } else { 
1331             /* Nothing to consolidate if there's one PDP per CDP. However, if
1332              * we've missed some PDPs, let's update null counters etc. */
1333             if (elapsed_pdp_st > 2) {
1334                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef, seasonal_coef, 
1335                                 rra_idx, ds_idx, cdp_idx, current_cf);
1336             }
1337         }
1338
1339         if (rrd_test_error())
1340             return -1;
1341     }       /* endif data sources loop */
1342     return 0;
1343 }
1344
1345 /*
1346  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1347  * primary value, secondary value, and # of unknowns.
1348  */
1349 static void update_cdp(
1350     unival *scratch,
1351     int current_cf,
1352     rrd_value_t pdp_temp_val,
1353     unsigned long rra_step_cnt,
1354     unsigned long elapsed_pdp_st,
1355     unsigned long start_pdp_offset,
1356     unsigned long pdp_cnt,
1357     rrd_value_t xff,
1358     int i, int ii)
1359 {
1360     /* shorthand variables */
1361     rrd_value_t            *cdp_val = &scratch[CDP_val].u_val;
1362     rrd_value_t    *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1363     rrd_value_t  *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1364     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1365
1366     if (rra_step_cnt) {
1367         /* If we are in this block, as least 1 CDP value will be written to
1368          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1369          * to be written, then the "fill in" value is the CDP_secondary_val
1370          * entry. */
1371         if (isnan(pdp_temp_val)) {
1372             *cdp_unkn_pdp_cnt += start_pdp_offset;
1373             *cdp_secondary_val = DNAN;
1374         } else {
1375             /* CDP_secondary value is the RRA "fill in" value for intermediary
1376              * CDP data entries. No matter the CF, the value is the same because
1377              * the average, max, min, and last of a list of identical values is
1378              * the same, namely, the value itself. */
1379             *cdp_secondary_val = pdp_temp_val;
1380         }
1381
1382         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1383             *cdp_primary_val = DNAN;
1384             if (current_cf == CF_AVERAGE) {
1385                 *cdp_val = initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1386                                 start_pdp_offset, pdp_cnt);
1387             } else {
1388                 *cdp_val = pdp_temp_val;
1389             }
1390         } else {
1391             initialize_cdp_val(scratch, current_cf, pdp_temp_val, 
1392                             elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1393         }   /* endif meets xff value requirement for a valid value */
1394         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1395          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1396         if (isnan(pdp_temp_val))
1397             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1398         else
1399             *cdp_unkn_pdp_cnt = 0;
1400     } else {    /* rra_step_cnt[i]  == 0 */
1401
1402 #ifdef DEBUG
1403         if (isnan(*cdp_val)) {
1404             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1405                             i, ii);
1406         } else {
1407             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1408                             i, ii, *cdp_val);
1409         }
1410 #endif
1411         if (isnan(pdp_temp_val)) {
1412             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1413         } else {
1414             *cdp_val = calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st, current_cf, i, ii);
1415         }
1416     }
1417 }
1418
1419 /*
1420  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1421  * on the type of consolidation function.
1422  */
1423 static void initialize_cdp_val(
1424     unival *scratch, 
1425     int current_cf,
1426     rrd_value_t pdp_temp_val,
1427     unsigned long elapsed_pdp_st, 
1428     unsigned long start_pdp_offset,
1429     unsigned long pdp_cnt) 
1430 {
1431     rrd_value_t cum_val, cur_val;
1432
1433     switch (current_cf) {
1434             case CF_AVERAGE:
1435                     cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1436                     cur_val = IFDNAN(pdp_temp_val, 0.0);
1437                     scratch[CDP_primary_val].u_val =
1438                             (cum_val + cur_val * start_pdp_offset) /
1439                             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1440                     scratch[CDP_val].u_val = initialize_average_carry_over(
1441                                     pdp_temp_val, elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1442                     break;
1443             case CF_MAXIMUM:
1444                     cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1445                     cur_val = IFDNAN(pdp_temp_val, -DINF);
1446 #if 0
1447 #ifdef DEBUG
1448                     if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1449                         fprintf(stderr,
1450                                         "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1451                                         i, ii);
1452                         exit(-1);
1453                     }
1454 #endif
1455 #endif
1456                     if (cur_val > cum_val)
1457                         scratch[CDP_primary_val].u_val = cur_val;
1458                     else
1459                         scratch[CDP_primary_val].u_val = cum_val;
1460                     /* initialize carry over value */
1461                     scratch[CDP_val].u_val = pdp_temp_val;
1462                     break;
1463             case CF_MINIMUM:
1464                     cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1465                     cur_val = IFDNAN(pdp_temp_val, DINF);
1466 #if 0
1467 #ifdef DEBUG
1468                     if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1469                         fprintf(stderr, "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1470                                         i, ii);
1471                         exit(-1);
1472                     }
1473 #endif
1474 #endif
1475                     if (cur_val < cum_val)
1476                         scratch[CDP_primary_val].u_val = cur_val;
1477                     else
1478                         scratch[CDP_primary_val].u_val = cum_val;
1479                     /* initialize carry over value */
1480                     scratch[CDP_val].u_val = pdp_temp_val;
1481                     break;
1482             case CF_LAST:
1483             default:
1484                     scratch[CDP_primary_val].u_val = pdp_temp_val;
1485                     /* initialize carry over value */
1486                     scratch[CDP_val].u_val = pdp_temp_val;
1487                     break;
1488     }
1489 }
1490
1491 /*
1492  * Update the consolidation function for Holt-Winters functions as
1493  * well as other functions that don't actually consolidate multiple
1494  * PDPs.
1495  */
1496 static void reset_cdp(
1497     rrd_t *rrd,
1498     unsigned long elapsed_pdp_st, 
1499     rrd_value_t *pdp_temp,
1500     rrd_value_t *last_seasonal_coef,
1501     rrd_value_t *seasonal_coef,
1502     int rra_idx, int ds_idx, int cdp_idx, 
1503     enum cf_en current_cf)
1504 {
1505     unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1506
1507     switch (current_cf) {
1508             case CF_AVERAGE:
1509             default:
1510                     scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1511                     scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1512                     break;
1513             case CF_SEASONAL:
1514             case CF_DEVSEASONAL:
1515                     /* need to update cached seasonal values, so they are consistent
1516                      * with the bulk update */
1517                     /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1518                      * CDP_last_deviation are the same. */
1519                     scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1520                     scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1521                     break;
1522             case CF_HWPREDICT:
1523             case CF_MHWPREDICT:
1524                     /* need to update the null_count and last_null_count.
1525                      * even do this for non-DNAN pdp_temp because the
1526                      * algorithm is not learning from batch updates. */
1527                     scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1528                     scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1529                     /* fall through */
1530             case CF_DEVPREDICT:
1531                     scratch[CDP_primary_val].u_val = DNAN;
1532                     scratch[CDP_secondary_val].u_val = DNAN;
1533                     break;
1534             case CF_FAILURES:
1535                     /* do not count missed bulk values as failures */
1536                     scratch[CDP_primary_val].u_val = 0;
1537                     scratch[CDP_secondary_val].u_val = 0;
1538                     /* need to reset violations buffer.
1539                      * could do this more carefully, but for now, just
1540                      * assume a bulk update wipes away all violations. */
1541                     erase_violations(rrd, cdp_idx, rra_idx);
1542                     break;
1543     }
1544 }
1545
1546 static rrd_value_t initialize_average_carry_over(
1547     rrd_value_t pdp_temp_val, 
1548     unsigned long elapsed_pdp_st,
1549     unsigned long start_pdp_offset,
1550     unsigned long pdp_cnt)
1551 {
1552     /* initialize carry over value */
1553     if (isnan(pdp_temp_val)) {
1554         return DNAN;
1555     } 
1556     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1557 }
1558
1559 /*
1560  * Update or initialize a CDP value based on the consolidation
1561  * function.
1562  *
1563  * Returns the new value.
1564  */
1565 static rrd_value_t calculate_cdp_val(
1566     rrd_value_t cdp_val,
1567     rrd_value_t pdp_temp_val,
1568     unsigned long elapsed_pdp_st,
1569     int current_cf, int i, int ii)
1570 {
1571     if (isnan(cdp_val)) {
1572         if (current_cf == CF_AVERAGE) {
1573             pdp_temp_val *= elapsed_pdp_st;
1574         }
1575 #ifdef DEBUG
1576         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1577                         i, ii, pdp_temp_val);
1578 #endif
1579         return pdp_temp_val;
1580     } 
1581     if (current_cf == CF_AVERAGE) 
1582         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1583     if (current_cf == CF_MINIMUM)
1584         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1585     if (current_cf == CF_MAXIMUM)
1586         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1587
1588     return pdp_temp_val;
1589 }
1590
1591 /*
1592  * For each RRA, update the seasonal values and then call update_aberrant_CF
1593  * for each data source.
1594  *
1595  * Return 0 on success, -1 on error.
1596  */
1597 static int update_aberrant_cdps(
1598     rrd_t *rrd, rrd_file_t *rrd_file, unsigned long rra_begin,
1599     unsigned long *rra_current, unsigned long elapsed_pdp_st, 
1600     rrd_value_t *pdp_temp, rrd_value_t  **seasonal_coef) 
1601 {
1602     unsigned long rra_idx, ds_idx, j;
1603
1604     /* number of PDP steps since the last update that
1605      * are assigned to the first CDP to be generated
1606      * since the last update. */
1607     unsigned short scratch_idx;
1608     unsigned long  rra_start;
1609     enum cf_en     current_cf;
1610
1611     /* this loop is only entered if elapsed_pdp_st < 3 */
1612     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1613                     j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1614         rra_start = rra_begin;
1615         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1616             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1617                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1618                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1619                     if (scratch_idx == CDP_primary_val) {
1620                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1621                                         elapsed_pdp_st + 1, seasonal_coef);
1622                     } else {
1623                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1624                                         elapsed_pdp_st + 2, seasonal_coef);
1625                     }
1626                     *rra_current = rrd_tell(rrd_file);
1627                 }
1628                 if (rrd_test_error())
1629                     return -1;
1630                 /* loop over data soures within each RRA */
1631                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1632                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1633                                     rra_idx * (rrd->stat_head->ds_cnt) + ds_idx,
1634                                     rra_idx, ds_idx, scratch_idx, *seasonal_coef);
1635                 }
1636             }
1637             rra_start += rrd->rra_def[rra_idx].row_cnt 
1638                     * rrd->stat_head->ds_cnt 
1639                     * sizeof(rrd_value_t);
1640         }
1641     }
1642     return 0;
1643 }
1644
1645 /* 
1646  * Move sequentially through the file, writing one RRA at a time.  Note this
1647  * architecture divorces the computation of CDP with flushing updated RRA
1648  * entries to disk.
1649  *
1650  * Return 0 on success, -1 on error.
1651  */
1652 static int write_to_rras(
1653     rrd_t *rrd, 
1654     rrd_file_t *rrd_file, 
1655     unsigned long *rra_step_cnt,
1656     unsigned long rra_begin, 
1657     unsigned long *rra_current,
1658     time_t current_time,
1659     unsigned long *skip_update,
1660     info_t **pcdp_summary)
1661 {
1662     unsigned long rra_idx;
1663     unsigned long rra_start;    
1664     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1665     time_t    rra_time = 0; /* time of update for a RRA */
1666
1667     /* Ready to write to disk */
1668     rra_start = rra_begin;
1669     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1670         /* skip unless there's something to write */
1671         if (rra_step_cnt[rra_idx]) {
1672             /* write the first row */
1673 #ifdef DEBUG
1674             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1675 #endif
1676             rrd->rra_ptr[rra_idx].cur_row++;
1677             if (rrd->rra_ptr[rra_idx].cur_row >= rrd->rra_def[rra_idx].row_cnt)
1678                 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1679             /* position on the first row */
1680             rra_pos_tmp = rra_start +
1681                     (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1682                     sizeof(rrd_value_t);
1683             if (rra_pos_tmp != *rra_current) {
1684                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1685                     rrd_set_error("seek error in rrd");
1686                     return -1;
1687                 }
1688                 *rra_current = rra_pos_tmp;
1689             }
1690 #ifdef DEBUG
1691             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1692 #endif
1693             if (!skip_update[rra_idx]) {
1694                 if (*pcdp_summary != NULL) {
1695                     rra_time = (current_time - current_time
1696                                     % (rrd->rra_def[rra_idx].pdp_cnt *
1697                                             rrd->stat_head->pdp_step))
1698                             - ((rra_step_cnt[rra_idx] - 1) * rrd->rra_def[rra_idx].pdp_cnt *
1699                                             rrd->stat_head->pdp_step);
1700                 }
1701                 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1702                                         pcdp_summary, rra_time) == -1)
1703                     return -1;
1704             }
1705
1706             /* write other rows of the bulk update, if any */
1707             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1708                 if (++rrd->rra_ptr[rra_idx].cur_row == rrd->rra_def[rra_idx].row_cnt) {
1709 #ifdef DEBUG
1710                     fprintf(stderr,
1711                                     "Wraparound for RRA %s, %lu updates left\n",
1712                                     rrd->rra_def[rra_idx].cf_nam, rra_step_cnt[rra_idx] - 1);
1713 #endif
1714                     /* wrap */
1715                     rrd->rra_ptr[rra_idx].cur_row = 0;
1716                     /* seek back to beginning of current rra */
1717                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1718                         rrd_set_error("seek error in rrd");
1719                         return -1;
1720                     }
1721 #ifdef DEBUG
1722                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1723                                     rrd_file->pos);
1724 #endif
1725                     *rra_current = rra_start;
1726                 }
1727                 if (!skip_update[rra_idx]) {
1728                     if (*pcdp_summary != NULL) {
1729                         rra_time = (current_time - current_time
1730                                         % (rrd->rra_def[rra_idx].pdp_cnt *
1731                                                 rrd->stat_head->pdp_step))
1732                                 -
1733                                 ((rra_step_cnt[rra_idx] - 2) * rrd->rra_def[rra_idx].pdp_cnt *
1734                                  rrd->stat_head->pdp_step);
1735                     }
1736                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1737                                             CDP_secondary_val, pcdp_summary, rra_time) == -1)
1738                         return -1;
1739                 }
1740             }
1741         }
1742         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1743                 sizeof(rrd_value_t);
1744     }           /* RRA LOOP */
1745
1746     return 0;
1747 }
1748
1749 /*
1750  * Write out one row of values (one value per DS) to the archive.
1751  *
1752  * Returns 0 on success, -1 on error.
1753  */
1754 static int write_RRA_row(
1755     rrd_file_t *rrd_file,
1756     rrd_t *rrd,
1757     unsigned long rra_idx,
1758     unsigned long *rra_current,
1759     unsigned short CDP_scratch_idx,
1760     info_t **pcdp_summary,
1761     time_t rra_time)
1762 {
1763     unsigned long ds_idx, cdp_idx;
1764     infoval   iv;
1765
1766     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1767         /* compute the cdp index */
1768         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1769 #ifdef DEBUG
1770         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1771                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1772                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1773 #endif
1774         if (pcdp_summary != NULL) {
1775             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1776             /* append info to the return hash */
1777             *pcdp_summary = info_push(*pcdp_summary,
1778                      sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1779                            rrd->rra_def[rra_idx].cf_nam,
1780                            rrd->rra_def[rra_idx].pdp_cnt,
1781                            rrd->ds_def[ds_idx].ds_nam), RD_I_VAL, iv);
1782         }
1783         if (rrd_write(rrd_file,
1784              &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1785              sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1786             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1787             return -1;
1788         }
1789         *rra_current += sizeof(rrd_value_t);
1790     }
1791     return 0;
1792 }
1793
1794 /*
1795  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1796  *
1797  * Returns 0 on success, -1 otherwise
1798  */
1799 static int smooth_all_rras(
1800     rrd_t *rrd, 
1801     rrd_file_t *rrd_file, 
1802     unsigned long rra_begin) 
1803 {
1804     unsigned long rra_start = rra_begin;
1805     unsigned long rra_idx;
1806     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
1807         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
1808                         cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
1809 #ifdef DEBUG
1810             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
1811 #endif
1812             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
1813             if (rrd_test_error())
1814                 return -1;
1815         }
1816         rra_start += rrd->rra_def[rra_idx].row_cnt
1817                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1818     }
1819     return 0;
1820 }
1821
1822 #ifndef HAVE_MMAP
1823 /*
1824  * Flush changes to disk (unless we're using mmap)
1825  *
1826  * Returns 0 on success, -1 otherwise
1827  */
1828 static int write_changes_to_disk(
1829     rrd_t *rrd, rrd_file_t *rrd_file, int version) 
1830 {
1831     /* we just need to write back the live header portion now*/
1832     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1833                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
1834                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
1835                  SEEK_SET) != 0) {
1836         rrd_set_error("seek rrd for live header writeback");
1837         return -1;
1838     }
1839     if (version >= 3) {
1840         if (rrd_write(rrd_file, rrd->live_head,
1841                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1842             rrd_set_error("rrd_write live_head to rrd");
1843             return -1;
1844         }
1845     } else {
1846         if (rrd_write(rrd_file, &rrd->live_head->last_up,
1847                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1848             rrd_set_error("rrd_write live_head to rrd");
1849             return -1;
1850         }
1851     }
1852
1853
1854     if (rrd_write(rrd_file, rrd->pdp_prep,
1855                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
1856         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
1857         rrd_set_error("rrd_write pdp_prep to rrd");
1858         return -1;
1859     }
1860
1861     if (rrd_write(rrd_file, rrd->cdp_prep,
1862                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
1863                   rrd->stat_head->ds_cnt)
1864         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
1865                       rrd->stat_head->ds_cnt)) {
1866
1867         rrd_set_error("rrd_write cdp_prep to rrd");
1868         return -1;
1869     }
1870
1871     if (rrd_write(rrd_file, rrd->rra_ptr,
1872                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
1873         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
1874         rrd_set_error("rrd_write rra_ptr to rrd");
1875         return -1;
1876     }
1877     return 0;
1878 }
1879 #endif