Refactored rrd_update code in preparation of finding the HW update
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.99907080300  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
16
17 #include <locale.h>
18
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
21
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
24
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
28  * replacement.
29  */
30 #include <sys/timeb.h>
31
32 #ifndef __MINGW32__
33 struct timeval {
34     time_t    tv_sec;   /* seconds */
35     long      tv_usec;  /* microseconds */
36 };
37 #endif
38
39 struct __timezone {
40     int       tz_minuteswest;   /* minutes W of Greenwich */
41     int       tz_dsttime;   /* type of dst correction */
42 };
43
44 static int gettimeofday(
45     struct timeval *t,
46     struct __timezone *tz)
47 {
48
49     struct _timeb current_time;
50
51     _ftime(&current_time);
52
53     t->tv_sec = current_time.time;
54     t->tv_usec = current_time.millitm * 1000;
55
56     return 0;
57 }
58
59 #endif
60
61 /* FUNCTION PROTOTYPES */
62
63 int       rrd_update_r(
64     const char *filename,
65     const char *tmplt,
66     int argc,
67     const char **argv);
68 int       _rrd_update(
69     const char *filename,
70     const char *tmplt,
71     int argc,
72     const char **argv,
73     info_t *);
74
75 static int allocate_data_structures(
76     rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp,
77     const char *tmplt, long **tmpl_idx, unsigned long *tmpl_cnt, 
78     unsigned long **rra_step_cnt, rrd_value_t **pdp_new);
79
80 static int parse_template(rrd_t *rrd, const char *tmplt,
81     unsigned long *tmpl_cnt, long *tmpl_idx);
82
83 static int process_arg(
84     char          *step_start, 
85     rrd_t         *rrd, 
86     rrd_file_t    *rrd_file, 
87     unsigned long  rra_begin,
88     unsigned long *rra_current,
89     time_t        *current_time,
90     unsigned long *current_time_usec,
91     rrd_value_t   *pdp_temp,
92     rrd_value_t   *pdp_new,
93     unsigned long *rra_step_cnt,
94     char         **updvals,
95     long          *tmpl_idx,
96     unsigned long  tmpl_cnt,
97     info_t       **pcdp_summary,
98     int            version,
99     int           *schedule_smooth);
100
101 static int parse_ds(rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
102     unsigned long tmpl_cnt, time_t *current_time, unsigned long *current_time_usec, 
103     int version);
104
105 static int get_time_from_reading(rrd_t *rrd, char timesyntax, char **updvals, 
106     time_t *current_time, unsigned long *current_time_usec, int version);
107
108 static int update_pdp_prep(rrd_t *rrd, char **updvals, 
109     rrd_value_t *pdp_new, double interval);
110
111 static int calculate_elapsed_steps(rrd_t *rrd, 
112     unsigned long current_time, unsigned long current_time_usec,
113     double interval, double *pre_int, double *post_int, 
114     unsigned long *proc_pdp_cnt);
115
116 static void simple_update(rrd_t *rrd, double interval, rrd_value_t *pdp_new);
117
118 static int process_all_pdp_st(rrd_t *rrd, double interval, 
119     double pre_int, double post_int, unsigned long elapsed_pdp_st, 
120     rrd_value_t *pdp_new, rrd_value_t *pdp_temp);
121
122 static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval, 
123     double pre_int, double post_int, long diff_pdp_st, rrd_value_t *pdp_new, 
124     rrd_value_t *pdp_temp);
125
126 static int update_all_cdp_prep(
127     rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin, 
128     rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
129     rrd_value_t **last_seasonal_coef, rrd_value_t **seasonal_coef,
130     rrd_value_t *pdp_temp, unsigned long *rra_current, int *schedule_smooth);
131
132 static int do_schedule_smooth(rrd_t *rrd, unsigned long rra_idx, 
133     unsigned long elapsed_pdp_st);
134
135 static int update_cdp_prep(rrd_t *rrd, unsigned long elapsed_pdp_st, 
136     unsigned long start_pdp_offset, unsigned long *rra_step_cnt, 
137     int rra_idx, rrd_value_t *pdp_temp, rrd_value_t *last_seasonal_coef, 
138     rrd_value_t *seasonal_coef, int current_cf);
139
140 static void update_cdp(unival *scratch, int current_cf, 
141     rrd_value_t pdp_temp_val, unsigned long rra_step_cnt, 
142     unsigned long elapsed_pdp_st, unsigned long start_pdp_offset, 
143     unsigned long pdp_cnt, rrd_value_t xff, int i, int ii);
144
145 static void initialize_cdp_val(unival *scratch, int current_cf,
146     rrd_value_t pdp_temp_val, unsigned long elapsed_pdp_st, 
147     unsigned long start_pdp_offset, unsigned long pdp_cnt);
148
149 static void reset_cdp(rrd_t *rrd, unsigned long elapsed_pdp_st, 
150     rrd_value_t *pdp_temp, rrd_value_t *last_seasonal_coef, 
151     rrd_value_t *seasonal_coef, 
152     int rra_idx, int ds_idx, int cdp_idx, enum cf_en current_cf);
153
154 static rrd_value_t initialize_average_carry_over(rrd_value_t pdp_temp_val,
155     unsigned long elapsed_pdp_st, unsigned long start_pdp_offset, 
156     unsigned long pdp_cnt);
157
158 static rrd_value_t calculate_cdp_val(
159     rrd_value_t cdp_val, rrd_value_t pdp_temp_val,
160     unsigned long elapsed_pdp_st, int current_cf, int i, int ii);
161
162 static int update_aberrant_cdps(rrd_t *rrd, rrd_file_t *rrd_file, 
163     unsigned long rra_begin, unsigned long *rra_current, 
164     unsigned long elapsed_pdp_st, rrd_value_t *pdp_temp, rrd_value_t **seasonal_coef);
165
166 static int write_to_rras(rrd_t *rrd, rrd_file_t *rrd_file, 
167     unsigned long *rra_step_cnt, unsigned long rra_begin, 
168     unsigned long *rra_current, time_t current_time, info_t **pcdp_summary);
169
170 static int write_RRA_row(rrd_file_t *rrd_file, rrd_t *rrd, unsigned long rra_idx,
171     unsigned long *rra_current, unsigned short CDP_scratch_idx, info_t **pcdp_summary,
172     time_t *rra_time);
173
174 static int smooth_all_rras(rrd_t *rrd, rrd_file_t *rrd_file, 
175     unsigned long rra_begin);
176
177 #ifndef HAVE_MMAP
178 static int write_changes_to_disk(rrd_t *rrd, rrd_file_t *rrd_file, 
179     int version);
180 #endif
181
182 /*
183  * normalize time as returned by gettimeofday. usec part must
184  * be always >= 0
185  */
186 static inline void normalize_time(
187     struct timeval *t)
188 {
189     if (t->tv_usec < 0) {
190         t->tv_sec--;
191         t->tv_usec += 1e6L;
192     }
193 }
194
195 /*
196  * Sets current_time and current_time_usec based on the current time.
197  * current_time_usec is set to 0 if the version number is 1 or 2.
198  */
199 static inline void initialize_time(
200     time_t *current_time, unsigned long *current_time_usec,
201     int version) 
202 {
203     struct timeval tmp_time;    /* used for time conversion */
204
205     gettimeofday(&tmp_time, 0);
206     normalize_time(&tmp_time);
207     *current_time = tmp_time.tv_sec;
208     if (version >= 3) {
209         *current_time_usec = tmp_time.tv_usec;
210     } else {
211         *current_time_usec = 0;
212     }
213 }
214
215 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
216
217 info_t   *rrd_update_v(
218     int argc,
219     char **argv)
220 {
221     char     *tmplt = NULL;
222     info_t   *result = NULL;
223     infoval   rc;
224     struct option long_options[] = {
225         {"template", required_argument, 0, 't'},
226         {0, 0, 0, 0}
227     };
228
229     rc.u_int = -1;
230     optind = 0;
231     opterr = 0;         /* initialize getopt */
232
233     while (1) {
234         int       option_index = 0;
235         int       opt;
236
237         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
238
239         if (opt == EOF)
240             break;
241
242         switch (opt) {
243         case 't':
244             tmplt = optarg;
245             break;
246
247         case '?':
248             rrd_set_error("unknown option '%s'", argv[optind - 1]);
249             goto end_tag;
250         }
251     }
252
253     /* need at least 2 arguments: filename, data. */
254     if (argc - optind < 2) {
255         rrd_set_error("Not enough arguments");
256         goto end_tag;
257     }
258     rc.u_int = 0;
259     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
260     rc.u_int = _rrd_update(argv[optind], tmplt,
261                            argc - optind - 1,
262                            (const char **) (argv + optind + 1), result);
263     result->value.u_int = rc.u_int;
264   end_tag:
265     return result;
266 }
267
268 int rrd_update(
269     int argc,
270     char **argv)
271 {
272     struct option long_options[] = {
273         {"template", required_argument, 0, 't'},
274         {0, 0, 0, 0}
275     };
276     int       option_index = 0;
277     int       opt;
278     char     *tmplt = NULL;
279     int       rc;
280
281     optind = 0;
282     opterr = 0;         /* initialize getopt */
283
284     while (1) {
285         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
286
287         if (opt == EOF)
288             break;
289
290         switch (opt) {
291         case 't':
292             tmplt = strdup(optarg);
293             break;
294
295         case '?':
296             rrd_set_error("unknown option '%s'", argv[optind - 1]);
297             return (-1);
298         }
299     }
300
301     /* need at least 2 arguments: filename, data. */
302     if (argc - optind < 2) {
303         rrd_set_error("Not enough arguments");
304
305         return -1;
306     }
307
308     rc = rrd_update_r(argv[optind], tmplt,
309                       argc - optind - 1, (const char **) (argv + optind + 1));
310     free(tmplt);
311     return rc;
312 }
313
314 int rrd_update_r(
315     const char *filename,
316     const char *tmplt,
317     int argc,
318     const char **argv)
319 {
320     return _rrd_update(filename, tmplt, argc, argv, NULL);
321 }
322
323 int _rrd_update(
324     const char *filename,
325     const char *tmplt,
326     int argc,
327     const char **argv,
328     info_t *pcdp_summary)
329 {
330
331     int           arg_i = 2;
332
333     unsigned long rra_begin;    /* byte pointer to the rra
334                                  * area in the rrd file.  this
335                                  * pointer never changes value */
336     unsigned long rra_current;  /* byte pointer to the current write
337                                  * spot in the rrd file. */
338     rrd_value_t   *pdp_new;     /* prepare the incoming data to be added 
339                                  * to the existing entry */
340     rrd_value_t   *pdp_temp;    /* prepare the pdp values to be added 
341                                  * to the cdp values */
342
343     long          *tmpl_idx;    /* index representing the settings
344                                  * transported by the tmplt index */
345     unsigned long tmpl_cnt = 2; /* time and data */
346     rrd_t         rrd;
347     time_t        current_time = 0;
348     unsigned long current_time_usec = 0;    /* microseconds part of current time */
349     char        **updvals;
350     int           schedule_smooth = 0;
351
352     /* number of elapsed PDP steps since last update */
353     unsigned long *rra_step_cnt = NULL;
354
355     int            version;     /* rrd version */
356     rrd_file_t    *rrd_file;
357     char          *arg_copy;    /* for processing the argv */
358
359     /* need at least 1 arguments: data. */
360     if (argc < 1) {
361         rrd_set_error("Not enough arguments");
362         goto err_out;
363     }
364
365     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
366         goto err_free;
367     }
368     /* We are now at the beginning of the rra's */
369     rra_current = rra_begin = rrd_file->header_len;
370
371     version = atoi(rrd.stat_head->version);
372
373     initialize_time(&current_time, &current_time_usec, version);
374
375     /* get exclusive lock to whole file.
376      * lock gets removed when we close the file.
377      */
378     if (LockRRD(rrd_file->fd) != 0) {
379         rrd_set_error("could not lock RRD");
380         goto err_close;
381     }
382
383     if (allocate_data_structures(&rrd, &updvals, 
384                     &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
385                     &rra_step_cnt, &pdp_new) == -1) {
386         goto err_close;
387     }
388
389     /* loop through the arguments. */
390     for (arg_i = 0; arg_i < argc; arg_i++) {
391         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
392             rrd_set_error("failed duplication argv entry");
393             break;
394         }
395         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current, 
396                         &current_time, &current_time_usec, pdp_temp, pdp_new,
397                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt, &pcdp_summary, 
398                         version, &schedule_smooth) == -1) {
399             free(arg_copy);
400             break;
401         }
402         free(arg_copy);
403     }
404
405     free(rra_step_cnt);
406
407     /* if we got here and if there is an error and if the file has not been
408      * written to, then close things up and return. */
409     if (rrd_test_error()) {
410         goto err_free_structures;
411     }
412
413 #ifndef HAVE_MMAP
414     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
415         goto err_free_structures;
416     }
417 #endif
418
419     /* calling the smoothing code here guarantees at most one smoothing
420      * operation per rrd_update call. Unfortunately, it is possible with bulk
421      * updates, or a long-delayed update for smoothing to occur off-schedule.
422      * This really isn't critical except during the burn-in cycles. */
423     if (schedule_smooth) {
424         smooth_all_rras(&rrd, rrd_file, rra_begin);
425     }
426
427 /*    rrd_dontneed(rrd_file,&rrd); */
428     rrd_free(&rrd);
429     rrd_close(rrd_file);
430
431     free(pdp_new);
432     free(tmpl_idx);
433     free(pdp_temp);
434     free(updvals);
435     return 0;
436
437   err_free_structures:
438     free(pdp_new);
439     free(tmpl_idx);
440     free(pdp_temp);
441     free(updvals);
442   err_close:
443     rrd_close(rrd_file);
444   err_free:
445     rrd_free(&rrd);
446   err_out:
447     return -1;
448 }
449
450 /*
451  * get exclusive lock to whole file.
452  * lock gets removed when we close the file
453  *
454  * returns 0 on success
455  */
456 int LockRRD(
457     int in_file)
458 {
459     int       rcstat;
460
461     {
462 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
463         struct _stat st;
464
465         if (_fstat(in_file, &st) == 0) {
466             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
467         } else {
468             rcstat = -1;
469         }
470 #else
471         struct flock lock;
472
473         lock.l_type = F_WRLCK;  /* exclusive write lock */
474         lock.l_len = 0; /* whole file */
475         lock.l_start = 0;   /* start of file */
476         lock.l_whence = SEEK_SET;   /* end of file */
477
478         rcstat = fcntl(in_file, F_SETLK, &lock);
479 #endif
480     }
481
482     return (rcstat);
483 }
484
485 /*
486  * Allocate some important arrays used, and initialize the template.
487  *
488  * When it returns, either all of the structures are allocated
489  * or none of them are.
490  *
491  * Returns 0 on success, -1 on error.
492  */
493 static int allocate_data_structures(
494     rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp, const char *tmplt, 
495     long **tmpl_idx, unsigned long *tmpl_cnt, unsigned long **rra_step_cnt, 
496     rrd_value_t **pdp_new) 
497 {
498     unsigned i, ii;
499     if ((*updvals = (char **)malloc(sizeof(char *) 
500                            * (rrd->stat_head->ds_cnt + 1))) == NULL) {
501         rrd_set_error("allocating updvals pointer array");
502         return -1;
503     }
504
505     if ((*pdp_temp = (rrd_value_t *)malloc(sizeof(rrd_value_t)
506                            * rrd->stat_head->ds_cnt)) == NULL) {
507         rrd_set_error("allocating pdp_temp ...");
508         goto err_free_updvals;
509     }
510
511     if ((*tmpl_idx = (long *)malloc(sizeof(unsigned long)
512                            * (rrd->stat_head->ds_cnt + 1))) == NULL) {
513         rrd_set_error("allocating tmpl_idx ...");
514         goto err_free_pdp_temp;
515     }
516     if ((*rra_step_cnt = (unsigned long *)malloc(sizeof(unsigned long) 
517                            * (rrd->stat_head->rra_cnt))) == NULL) {
518         rrd_set_error("allocating rra_step_cnt...");
519         goto err_free_tmpl_idx;
520     }
521
522     /* initialize tmplt redirector */
523     /* default config example (assume DS 1 is a CDEF DS)
524        tmpl_idx[0] -> 0; (time)
525        tmpl_idx[1] -> 1; (DS 0)
526        tmpl_idx[2] -> 3; (DS 2)
527        tmpl_idx[3] -> 4; (DS 3) */
528     (*tmpl_idx)[0] = 0;    /* time */
529     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
530         if (dst_conv(rrd->ds_def[i-1].dst) != DST_CDEF)
531             (*tmpl_idx)[ii++] = i;
532     }
533     *tmpl_cnt = ii;
534
535     if (tmplt != NULL) {
536         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
537             goto err_free_tmpl_idx;
538         }
539     }
540
541     if ((*pdp_new = (rrd_value_t *)malloc(sizeof(rrd_value_t)
542                           * rrd->stat_head->ds_cnt)) == NULL) {
543         rrd_set_error("allocating pdp_new ...");
544         goto err_free_tmpl_idx;
545     }
546
547     return 0;
548
549 err_free_tmpl_idx:
550     free(*tmpl_idx);
551 err_free_pdp_temp:
552     free(*pdp_temp);
553 err_free_updvals:
554     free(*updvals);
555     return -1;
556 }
557
558 /*
559  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
560  *
561  * Returns 0 on success.
562  */
563 static int parse_template(
564     rrd_t *rrd, const char *tmplt, 
565     unsigned long *tmpl_cnt, long *tmpl_idx)
566 {
567     char          *dsname, *tmplt_copy;
568     unsigned int   tmpl_len, i;
569
570     *tmpl_cnt = 1;   /* the first entry is the time */
571
572     /* we should work on a writeable copy here */
573     if ((tmplt_copy = strdup(tmplt)) == NULL) {
574         rrd_set_error("error copying tmplt '%s'", tmplt);
575         return -1;
576     }
577
578     dsname = tmplt_copy;
579     tmpl_len = strlen(tmplt_copy);
580     for (i = 0; i <= tmpl_len; i++) {
581         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
582             tmplt_copy[i] = '\0';
583             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
584                 rrd_set_error("tmplt contains more DS definitions than RRD");
585                 free(tmplt_copy);
586                 return -1;
587             }
588             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname)+1) == 0) {
589                 rrd_set_error("unknown DS name '%s'", dsname);
590                 free(tmplt_copy);
591                 return -1;
592             } 
593             /* go to the next entry on the tmplt_copy */
594             if (i < tmpl_len)
595                 dsname = &tmplt_copy[i+1];
596         }
597     }
598     free(tmplt_copy);
599     return 0;
600 }
601
602 /*
603  * Parse an update string, updates the primary data points (PDPs)
604  * and consolidated data points (CDPs), and writes changes to the RRAs.
605  *
606  * Returns 0 on success, -1 on error.
607  */
608 static int process_arg(
609     char           *step_start, 
610     rrd_t          *rrd, 
611     rrd_file_t     *rrd_file, 
612     unsigned long  rra_begin,
613     unsigned long *rra_current,
614     time_t        *current_time,
615     unsigned long *current_time_usec,
616     rrd_value_t   *pdp_temp,
617     rrd_value_t   *pdp_new,
618     unsigned long *rra_step_cnt,
619     char         **updvals,
620     long          *tmpl_idx,
621     unsigned long  tmpl_cnt,
622     info_t       **pcdp_summary,
623     int            version,
624     int           *schedule_smooth)
625 {
626     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
627
628     /* a vector of future Holt-Winters seasonal coefs */
629     unsigned long elapsed_pdp_st;
630
631     double    interval, pre_int, post_int;  /* interval between this and
632                                              * the last run */
633     unsigned long proc_pdp_cnt;
634     unsigned long rra_start;
635
636     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt, 
637             current_time, current_time_usec, version) == -1) {
638         return -1;
639     }
640     /* seek to the beginning of the rra's */
641     if (*rra_current != rra_begin) {
642 #ifndef HAVE_MMAP
643         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
644             rrd_set_error("seek error in rrd");
645             return -1;
646         }
647 #endif
648         *rra_current = rra_begin;
649     }
650     rra_start = rra_begin;
651
652     interval = (double) (*current_time - rrd->live_head->last_up)
653             + (double) ((long) *current_time_usec -
654                             (long) rrd->live_head->last_up_usec) / 1e6f;
655
656     /* process the data sources and update the pdp_prep 
657      * area accordingly */
658     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
659         return -1;
660     }
661
662     elapsed_pdp_st = calculate_elapsed_steps(rrd, 
663                     *current_time, *current_time_usec, 
664                     interval, &pre_int, &post_int,
665                     &proc_pdp_cnt);
666
667     /* has a pdp_st moment occurred since the last run ? */
668     if (elapsed_pdp_st == 0) {
669         /* no we have not passed a pdp_st moment. therefore update is simple */
670         simple_update(rrd, interval, pdp_new);
671     } else {
672         /* an pdp_st has occurred. */
673         if (process_all_pdp_st(rrd, interval, 
674                                pre_int, post_int,
675                                elapsed_pdp_st, 
676                                pdp_new, pdp_temp) == -1) 
677         {
678             return -1;
679         }
680         if (update_all_cdp_prep(rrd, rra_step_cnt, 
681                                 rra_begin, rrd_file,
682                                 elapsed_pdp_st,
683                                 proc_pdp_cnt,
684                                 &last_seasonal_coef,
685                                 &seasonal_coef,
686                                 pdp_temp, rra_current,
687                                 schedule_smooth) == -1)
688         {
689             goto err_free_coefficients;
690         }
691         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current, 
692                 elapsed_pdp_st, pdp_temp, &seasonal_coef) == -1)
693         {
694             goto err_free_coefficients;
695         }
696         if (write_to_rras(rrd, rrd_file, 
697                 rra_step_cnt, rra_begin, rra_current, 
698                 *current_time, pcdp_summary) == -1)
699         {
700             goto err_free_coefficients;
701         }
702     }               /* endif a pdp_st has occurred */
703     rrd->live_head->last_up = *current_time;
704     rrd->live_head->last_up_usec = *current_time_usec;
705
706     free(seasonal_coef);
707     free(last_seasonal_coef);
708     return 0;
709
710 err_free_coefficients:
711     free(seasonal_coef);
712     free(last_seasonal_coef);
713     return -1;
714 }
715
716 /*
717  * Parse a DS string (time + colon-separated values), storing the
718  * results in current_time, current_time_usec, and updvals.
719  *
720  * Returns 0 on success, -1 on error.
721  */
722 static int parse_ds(
723     rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
724     unsigned long tmpl_cnt, time_t *current_time,
725     unsigned long *current_time_usec, int version)
726 {
727     char *p;
728     unsigned long i;
729     char timesyntax;
730
731     updvals[0] = input;
732     /* initialize all ds input to unknown except the first one
733        which has always got to be set */
734     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
735         updvals[i] = "U";
736
737     /* separate all ds elements; first must be examined separately
738        due to alternate time syntax */
739     if ((p = strchr(input, '@')) != NULL) {
740         timesyntax = '@';
741     } else if ((p = strchr(input, ':')) != NULL) {
742         timesyntax = ':';
743     } else {
744         rrd_set_error("expected timestamp not found in data source from %s",
745                  input);
746         return -1;
747     }
748     *p = '\0';
749     i = 1;
750     updvals[tmpl_idx[i++]] = p+1;
751     while (*(++p)) {
752         if (*p == ':') {
753             *p = '\0';
754             if (i < tmpl_cnt) {
755                 updvals[tmpl_idx[i++]] = p+1;
756             }
757         }
758     }
759
760     if (i != tmpl_cnt) {
761         rrd_set_error("expected %lu data source readings (got %lu) from %s",
762                  tmpl_cnt - 1, i, input);
763         return -1;
764     }
765
766     if (get_time_from_reading(rrd, timesyntax, updvals, 
767                             current_time, current_time_usec, 
768                             version) == -1) {
769         return -1;
770     }
771     return 0;
772 }
773
774 /*
775  * Parse the time in a DS string, store it in current_time and 
776  * current_time_usec and verify that it's later than the last
777  * update for this DS.
778  *
779  * Returns 0 on success, -1 on error.
780  */
781 static int get_time_from_reading(
782     rrd_t *rrd, char timesyntax, char **updvals, 
783     time_t *current_time, unsigned long *current_time_usec,
784     int version)
785 {
786     double    tmp;
787     char     *parsetime_error = NULL;
788     char     *old_locale;
789     struct rrd_time_value ds_tv;
790     struct timeval tmp_time;    /* used for time conversion */
791
792     /* get the time from the reading ... handle N */
793     if (timesyntax == '@') { /* at-style */
794         if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
795             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
796             return -1;
797         }
798         if (ds_tv.type == RELATIVE_TO_END_TIME ||
799                         ds_tv.type == RELATIVE_TO_START_TIME) {
800             rrd_set_error("specifying time relative to the 'start' "
801                             "or 'end' makes no sense here: %s", updvals[0]);
802             return -1;
803         }
804         *current_time = mktime(&ds_tv.tm) + ds_tv.offset;
805         *current_time_usec = 0;  /* FIXME: how to handle usecs here ? */
806     } else if (strcmp(updvals[0], "N") == 0) {
807         gettimeofday(&tmp_time, 0);
808         normalize_time(&tmp_time);
809         *current_time = tmp_time.tv_sec;
810         *current_time_usec = tmp_time.tv_usec;
811     } else {
812         old_locale = setlocale(LC_NUMERIC, "C");
813         tmp = strtod(updvals[0], 0);
814         setlocale(LC_NUMERIC, old_locale);
815         *current_time = floor(tmp);
816         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
817     }
818     /* dont do any correction for old version RRDs */
819     if (version < 3)
820         *current_time_usec = 0;
821
822     if (*current_time < rrd->live_head->last_up ||
823                     (*current_time == rrd->live_head->last_up &&
824                      (long) *current_time_usec <=
825                      (long) rrd->live_head->last_up_usec)) {
826         rrd_set_error("illegal attempt to update using time %ld when "
827                           "last update time is %ld (minimum one second step)",
828                           *current_time, rrd->live_head->last_up);
829         return -1;
830     }
831     return 0;
832 }
833
834 /*
835  * Update pdp_new by interpreting the updvals according to the DS type
836  * (COUNTER, GAUGE, etc.).
837  *
838  * Returns 0 on success, -1 on error.
839  */
840 static int update_pdp_prep(
841     rrd_t *rrd, char **updvals, 
842     rrd_value_t *pdp_new, double interval) 
843 {
844     unsigned long ds_idx; 
845     int ii;
846     char     *endptr;   /* used in the conversion */
847     double rate;
848     char     *old_locale;
849     enum dst_en dst_idx;
850
851     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
852         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
853
854         /* make sure we do not build diffs with old last_ds values */
855         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
856             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
857             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
858         }
859
860         /* NOTE: DST_CDEF should never enter this if block, because
861          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
862          * accidently specified a value for the DST_CDEF. To handle this case,
863          * an extra check is required. */
864
865         if ((updvals[ds_idx+1][0] != 'U') &&
866                         (dst_idx != DST_CDEF) &&
867                         rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
868             rate = DNAN;
869
870             /* pdp_new contains rate * time ... eg the bytes transferred during
871              * the interval. Doing it this way saves a lot of math operations
872              */
873             switch (dst_idx) {
874                     case DST_COUNTER:
875                     case DST_DERIVE:
876                             for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
877                                 if ((updvals[ds_idx + 1][ii] < '0' || updvals[ds_idx + 1][ii] > '9') 
878                                                 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
879                                     rrd_set_error("not a simple integer: '%s'", updvals[ds_idx + 1]);
880                                     return -1;
881                                 }
882                             }
883                             if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
884                                 pdp_new[ds_idx] = rrd_diff(updvals[ds_idx+1], rrd->pdp_prep[ds_idx].last_ds);
885                                 if (dst_idx == DST_COUNTER) {
886                                     /* simple overflow catcher. This will fail
887                                      * terribly for non 32 or 64 bit counters
888                                      * ... are there any others in SNMP land?
889                                      */
890                                     if (pdp_new[ds_idx] < (double) 0.0)
891                                         pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
892                                     if (pdp_new[ds_idx] < (double) 0.0)
893                                         pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
894                                 }
895                                 rate = pdp_new[ds_idx] / interval;
896                             } else {
897                                 pdp_new[ds_idx] = DNAN;
898                             }
899                             break;
900                     case DST_ABSOLUTE:
901                             old_locale = setlocale(LC_NUMERIC, "C");
902                             errno = 0;
903                             pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
904                             setlocale(LC_NUMERIC, old_locale);
905                             if (errno > 0) {
906                                 rrd_set_error("converting '%s' to float: %s",
907                                                 updvals[ds_idx + 1], rrd_strerror(errno));
908                                 return -1;
909                             };
910                             if (endptr[0] != '\0') {
911                                 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",
912                                          updvals[ds_idx + 1], endptr);
913                                 return -1;
914                             }
915                             rate = pdp_new[ds_idx] / interval;
916                             break;
917                     case DST_GAUGE:
918                             errno = 0;
919                             old_locale = setlocale(LC_NUMERIC, "C");
920                             pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr) * interval;
921                             setlocale(LC_NUMERIC, old_locale);
922                             if (errno) {
923                                 rrd_set_error("converting '%s' to float: %s",
924                                                 updvals[ds_idx + 1], rrd_strerror(errno));
925                                 return -1;
926                             };
927                             if (endptr[0] != '\0') {
928                                 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",
929                                          updvals[ds_idx + 1], endptr);
930                                 return -1;
931                             }
932                             rate = pdp_new[ds_idx] / interval;
933                             break;
934                     default:
935                             rrd_set_error("rrd contains unknown DS type : '%s'",
936                                             rrd->ds_def[ds_idx].dst);
937                             return -1;
938             }
939             /* break out of this for loop if the error string is set */
940             if (rrd_test_error()) {
941                 return -1;
942             }
943             /* make sure pdp_temp is neither too large or too small
944              * if any of these occur it becomes unknown ...
945              * sorry folks ... */
946             if (!isnan(rate) &&
947                             ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
948                               rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
949                              (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
950                               rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
951                 pdp_new[ds_idx] = DNAN;
952             }
953         } else {
954             /* no news is news all the same */
955             pdp_new[ds_idx] = DNAN;
956         }
957
958
959         /* make a copy of the command line argument for the next run */
960 #ifdef DEBUG
961         fprintf(stderr, "prep ds[%lu]\t"
962                         "last_arg '%s'\t"
963                         "this_arg '%s'\t"
964                         "pdp_new %10.2f\n",
965                         ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx+1], pdp_new[ds_idx]);
966 #endif
967         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx+1], LAST_DS_LEN - 1);
968         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN-1] = '\0';
969     }
970     return 0;
971 }
972
973 /*
974  * How many PDP steps have elapsed since the last update? Returns the answer,
975  * and stores the time between the last update and the last PDP in pre_time,
976  * and the time between the last PDP and the current time in post_int.
977  */
978 static int calculate_elapsed_steps(
979     rrd_t *rrd, 
980     unsigned long current_time, 
981     unsigned long current_time_usec,
982     double interval,
983     double *pre_int,
984     double *post_int,
985     unsigned long *proc_pdp_cnt) 
986 {
987
988     unsigned long proc_pdp_st;  /* which pdp_st was the last
989                                  * to be processed */
990     unsigned long occu_pdp_st;  /* when was the pdp_st
991                                  * before the last update
992                                  * time */
993     unsigned long proc_pdp_age; /* how old was the data in
994                                  * the pdp prep area when it
995                                  * was last updated */
996     unsigned long occu_pdp_age; /* how long ago was the last
997                                  * pdp_step time */
998
999     /* when was the current pdp started */
1000     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1001     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1002
1003     /* when did the last pdp_st occur */
1004     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1005     occu_pdp_st = current_time - occu_pdp_age;
1006
1007     if (occu_pdp_st > proc_pdp_st) {
1008         /* OK we passed the pdp_st moment */
1009         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;  /* how much of the input data
1010                                                                  * occurred before the latest
1011                                                                  * pdp_st moment*/
1012         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f;  /* adjust usecs */
1013         *post_int = occu_pdp_age;    /* how much after it */
1014         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1015     } else {
1016         *pre_int = interval;
1017         *post_int = 0;
1018     }
1019
1020     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1021
1022 #ifdef DEBUG
1023     printf("proc_pdp_age %lu\t"
1024                     "proc_pdp_st %lu\t"
1025                     "occu_pfp_age %lu\t"
1026                     "occu_pdp_st %lu\t"
1027                     "int %lf\t"
1028                     "pre_int %lf\t"
1029                     "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1030                     occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1031 #endif
1032
1033     /* compute the number of elapsed pdp_st moments */
1034     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1035 }
1036
1037 /*
1038  * Increment the PDP values by the values in pdp_new, or else initialize them.
1039  */
1040 static void simple_update(
1041     rrd_t *rrd, double interval, rrd_value_t *pdp_new) 
1042 {
1043     int i;
1044     for (i = 0; i < (signed)rrd->stat_head->ds_cnt; i++) {
1045         if (isnan(pdp_new[i])) {
1046             /* this is not really accurate if we use subsecond data arrival time
1047                should have thought of it when going subsecond resolution ...
1048                sorry next format change we will have it! */
1049             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
1050         } else {
1051             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1052                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1053             } else {
1054                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1055             }
1056         }
1057 #ifdef DEBUG
1058         fprintf(stderr,
1059                         "NO PDP  ds[%i]\t"
1060                         "value %10.2f\t"
1061                         "unkn_sec %5lu\n",
1062                         i,
1063                         rrd->pdp_prep[i].scratch[PDP_val].u_val,
1064                         rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1065 #endif
1066     }
1067 }
1068
1069 /*
1070  * Call process_pdp_st for each DS.
1071  *
1072  * Returns 0 on success, -1 on error.
1073  */
1074 static int process_all_pdp_st(
1075     rrd_t *rrd, double interval, double pre_int, double post_int, 
1076     unsigned long elapsed_pdp_st, rrd_value_t *pdp_new, rrd_value_t *pdp_temp) 
1077 {
1078     unsigned long ds_idx;
1079     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1080        rate*seconds which occurred up to the last run.
1081        pdp_new[] contains rate*seconds from the latest run.
1082        pdp_temp[] will contain the rate for cdp */
1083
1084     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1085         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1086                                 elapsed_pdp_st * rrd->stat_head->pdp_step, 
1087                                 pdp_new, pdp_temp) == -1) {
1088             return -1;
1089         }
1090 #ifdef DEBUG
1091         fprintf(stderr, "PDP UPD ds[%lu]\t"
1092                         "pdp_temp %10.2f\t"
1093                         "new_prep %10.2f\t"
1094                         "new_unkn_sec %5lu\n",
1095                         ds_idx, pdp_temp[ds_idx],
1096                         rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1097                         rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1098 #endif
1099     }
1100     return 0;
1101 }
1102
1103 /*
1104  * Process an update that occurs after one of the PDP moments.
1105  * Increments the PDP value, sets NAN if time greater than the
1106  * heartbeats have elapsed, processes CDEFs.
1107  *
1108  * Returns 0 on success, -1 on error.
1109  */
1110 static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval, 
1111     double pre_int, double post_int, long diff_pdp_st, 
1112     rrd_value_t *pdp_new, rrd_value_t *pdp_temp) 
1113 {
1114     int i;
1115     /* update pdp_prep to the current pdp_st. */
1116     double    pre_unknown = 0.0;
1117     unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1118     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1119
1120     rpnstack_t     rpnstack;    /* used for COMPUTE DS */
1121     rpnstack_init(&rpnstack);
1122
1123
1124     if (isnan(pdp_new[ds_idx])) {
1125         /* a final bit of unknown to be added bevore calculation
1126            we use a temporary variable for this so that we
1127            don't have to turn integer lines before using the value */
1128         pre_unknown = pre_int;
1129     } else {
1130         if (isnan(scratch[PDP_val].u_val)) {
1131             scratch[PDP_val].u_val = 0;
1132         } 
1133         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1134     }
1135
1136     /* if too much of the pdp_prep is unknown we dump it */
1137     /* if the interval is larger thatn mrhb we get NAN */
1138     if ((interval > mrhb) ||
1139                     (diff_pdp_st <= (signed)scratch[PDP_unkn_sec_cnt].u_cnt)) {
1140         pdp_temp[ds_idx] = DNAN;
1141     } else {
1142         pdp_temp[ds_idx] = scratch[PDP_val].u_val / 
1143                 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) - pre_unknown);
1144     }
1145
1146     /* process CDEF data sources; remember each CDEF DS can
1147      * only reference other DS with a lower index number */
1148     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1149         rpnp_t   *rpnp;
1150
1151         rpnp = rpn_expand((rpn_cdefds_t *)&(rrd->ds_def[ds_idx].par[DS_cdef]));
1152         /* substitute data values for OP_VARIABLE nodes */
1153         for (i = 0; rpnp[i].op != OP_END; i++) {
1154             if (rpnp[i].op == OP_VARIABLE) {
1155                 rpnp[i].op = OP_NUMBER;
1156                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1157             }
1158         }
1159         /* run the rpn calculator */
1160         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1161             free(rpnp);
1162             rpnstack_free(&rpnstack);
1163             return -1;
1164         }
1165     }
1166
1167     /* make pdp_prep ready for the next run */
1168     if (isnan(pdp_new[ds_idx])) {
1169         /* this is not realy accurate if we use subsecond data arival time
1170            should have thought of it when going subsecond resolution ...
1171            sorry next format change we will have it! */
1172         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1173         scratch[PDP_val].u_val = DNAN;
1174     } else {
1175         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1176         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1177     }
1178     rpnstack_free(&rpnstack);
1179     return 0;
1180 }
1181
1182 /*
1183  * Iterate over all the RRAs for a given DS and:
1184  * 1. Decide whether to schedule a smooth later
1185  * 2. Shift the seasonal array if it's a bulk update
1186  * 3. Update the CDP
1187  *
1188  * Returns 0 on success, -1 on error
1189  */
1190 static int update_all_cdp_prep(
1191     rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin, 
1192     rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
1193     rrd_value_t  **last_seasonal_coef, rrd_value_t  **seasonal_coef,
1194     rrd_value_t *pdp_temp, unsigned long *rra_current, int *schedule_smooth) 
1195 {
1196     unsigned long rra_idx;
1197     /* index into the CDP scratch array */
1198     enum cf_en current_cf;
1199     unsigned long rra_start;
1200     /* number of rows to be updated in an RRA for a data value. */
1201     unsigned long start_pdp_offset;
1202
1203     rra_start = rra_begin;
1204     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1205         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1206         start_pdp_offset = rrd->rra_def[rra_idx].pdp_cnt - proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1207         if (start_pdp_offset <= elapsed_pdp_st) {
1208             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1209                     rrd->rra_def[rra_idx].pdp_cnt + 1;
1210         } else {
1211             rra_step_cnt[rra_idx] = 0;
1212         }
1213
1214         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1215             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1216              * so that they will be correct for the next observed value; note that for
1217              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1218              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1219             if (rra_step_cnt[rra_idx] > 2) {
1220                 /* skip update by resetting rra_step_cnt[rra_idx], note that this is not data
1221                  * source specific; this is due to the bulk update, not a DNAN value
1222                  * for the specific data source. */
1223                 rra_step_cnt[rra_idx] = 0;
1224                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1225                                 elapsed_pdp_st, last_seasonal_coef);
1226                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1227                                 elapsed_pdp_st + 1, seasonal_coef);
1228             }
1229             /* periodically run a smoother for seasonal effects */
1230             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1231 #ifdef DEBUG
1232                 fprintf(stderr, "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1233                                 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st, 
1234                                 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt);
1235 #endif
1236                 *schedule_smooth = 1;
1237             }
1238             *rra_current = rrd_tell(rrd_file);
1239         }
1240         /* if cf is DEVSEASONAL or SEASONAL */
1241         if (rrd_test_error())
1242             return -1;
1243
1244         if (update_cdp_prep(rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, 
1245                                 rra_idx, pdp_temp, *last_seasonal_coef, *seasonal_coef, 
1246                                 current_cf) == -1) {
1247             return -1;
1248         }
1249         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1250     }
1251     return 0;
1252 }
1253
1254 /* 
1255  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1256  */
1257 static int do_schedule_smooth(
1258     rrd_t *rrd, unsigned long rra_idx,
1259     unsigned long elapsed_pdp_st)
1260 {
1261     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1262     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1263     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1264     unsigned long seasonal_smooth_idx = rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1265     unsigned long *init_seasonal = &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1266
1267     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1268      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1269      * really an RRA level, not a data source within RRA level parameter, but
1270      * the rra_def is read only for rrd_update (not flushed to disk). */
1271     if (*init_seasonal > BURNIN_CYCLES) {
1272         /* someone has no doubt invented a trick to deal with this wrap around,
1273          * but at least this code is clear. */
1274         if (seasonal_smooth_idx > cur_row) {
1275             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1276              * between PDP and CDP */
1277             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1278         } 
1279         /* can't rely on negative numbers because we are working with
1280          * unsigned values */
1281         return (cur_row + elapsed_pdp_st >= row_cnt
1282                         && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1283     } 
1284     /* mark off one of the burn-in cycles */
1285     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1286 }
1287
1288 /*
1289  * For a given RRA, iterate over the data sources and call the appropriate
1290  * consolidation function.
1291  *
1292  * Returns 0 on success, -1 on error.
1293  */
1294 static int update_cdp_prep(
1295     rrd_t *rrd, 
1296     unsigned long elapsed_pdp_st, 
1297     unsigned long start_pdp_offset, 
1298     unsigned long *rra_step_cnt, 
1299     int rra_idx, 
1300     rrd_value_t *pdp_temp, 
1301     rrd_value_t *last_seasonal_coef, 
1302     rrd_value_t *seasonal_coef, 
1303     int current_cf) 
1304 {
1305     unsigned long ds_idx, cdp_idx;
1306     /* update CDP_PREP areas */
1307     /* loop over data soures within each RRA */
1308     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1309
1310         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1311
1312         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1313             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf, 
1314                             pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1315                             elapsed_pdp_st, start_pdp_offset,
1316                             rrd->rra_def[rra_idx].pdp_cnt,
1317                             rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val, rra_idx, ds_idx);
1318         } else { 
1319             /* Nothing to consolidate if there's one PDP per CDP. However, if
1320              * we've missed some PDPs, let's update null counters etc. */
1321             if (elapsed_pdp_st > 2) {
1322                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef, seasonal_coef, 
1323                                 rra_idx, ds_idx, cdp_idx, current_cf);
1324             }
1325         }
1326
1327         if (rrd_test_error())
1328             return -1;
1329     }       /* endif data sources loop */
1330     return 0;
1331 }
1332
1333 /*
1334  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1335  * primary value, secondary value, and # of unknowns.
1336  */
1337 static void update_cdp(
1338     unival *scratch,
1339     int current_cf,
1340     rrd_value_t pdp_temp_val,
1341     unsigned long rra_step_cnt,
1342     unsigned long elapsed_pdp_st,
1343     unsigned long start_pdp_offset,
1344     unsigned long pdp_cnt,
1345     rrd_value_t xff,
1346     int i, int ii)
1347 {
1348     /* shorthand variables */
1349     rrd_value_t            *cdp_val = &scratch[CDP_val].u_val;
1350     rrd_value_t    *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1351     rrd_value_t  *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1352     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1353
1354     if (rra_step_cnt) {
1355         /* If we are in this block, as least 1 CDP value will be written to
1356          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1357          * to be written, then the "fill in" value is the CDP_secondary_val
1358          * entry. */
1359         if (isnan(pdp_temp_val)) {
1360             *cdp_unkn_pdp_cnt += start_pdp_offset;
1361             *cdp_secondary_val = DNAN;
1362         } else {
1363             /* CDP_secondary value is the RRA "fill in" value for intermediary
1364              * CDP data entries. No matter the CF, the value is the same because
1365              * the average, max, min, and last of a list of identical values is
1366              * the same, namely, the value itself. */
1367             *cdp_secondary_val = pdp_temp_val;
1368         }
1369
1370         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1371             *cdp_primary_val = DNAN;
1372             if (current_cf == CF_AVERAGE) {
1373                 *cdp_val = initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1374                                 start_pdp_offset, pdp_cnt);
1375             } else {
1376                 *cdp_val = pdp_temp_val;
1377             }
1378         } else {
1379             initialize_cdp_val(scratch, current_cf, pdp_temp_val, 
1380                             elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1381         }   /* endif meets xff value requirement for a valid value */
1382         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1383          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1384         if (isnan(pdp_temp_val))
1385             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1386         else
1387             *cdp_unkn_pdp_cnt = 0;
1388     } else {    /* rra_step_cnt[i]  == 0 */
1389
1390 #ifdef DEBUG
1391         if (isnan(*cdp_val)) {
1392             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1393                             i, ii);
1394         } else {
1395             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1396                             i, ii, *cdp_val);
1397         }
1398 #endif
1399         if (isnan(pdp_temp_val)) {
1400             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1401         } else {
1402             *cdp_val = calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st, current_cf, i, ii);
1403         }
1404     }
1405 }
1406
1407 /*
1408  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1409  * on the type of consolidation function.
1410  */
1411 static void initialize_cdp_val(
1412     unival *scratch, 
1413     int current_cf,
1414     rrd_value_t pdp_temp_val,
1415     unsigned long elapsed_pdp_st, 
1416     unsigned long start_pdp_offset,
1417     unsigned long pdp_cnt) 
1418 {
1419     rrd_value_t cum_val, cur_val;
1420
1421     switch (current_cf) {
1422             case CF_AVERAGE:
1423                     cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1424                     cur_val = IFDNAN(pdp_temp_val, 0.0);
1425                     scratch[CDP_primary_val].u_val =
1426                             (cum_val + cur_val * start_pdp_offset) /
1427                             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1428                     scratch[CDP_val].u_val = initialize_average_carry_over(
1429                                     pdp_temp_val, elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1430                     break;
1431             case CF_MAXIMUM:
1432                     cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1433                     cur_val = IFDNAN(pdp_temp_val, -DINF);
1434 #if 0
1435 #ifdef DEBUG
1436                     if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1437                         fprintf(stderr,
1438                                         "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1439                                         i, ii);
1440                         exit(-1);
1441                     }
1442 #endif
1443 #endif
1444                     if (cur_val > cum_val)
1445                         scratch[CDP_primary_val].u_val = cur_val;
1446                     else
1447                         scratch[CDP_primary_val].u_val = cum_val;
1448                     /* initialize carry over value */
1449                     scratch[CDP_val].u_val = pdp_temp_val;
1450                     break;
1451             case CF_MINIMUM:
1452                     cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1453                     cur_val = IFDNAN(pdp_temp_val, DINF);
1454 #if 0
1455 #ifdef DEBUG
1456                     if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1457                         fprintf(stderr, "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1458                                         i, ii);
1459                         exit(-1);
1460                     }
1461 #endif
1462 #endif
1463                     if (cur_val < cum_val)
1464                         scratch[CDP_primary_val].u_val = cur_val;
1465                     else
1466                         scratch[CDP_primary_val].u_val = cum_val;
1467                     /* initialize carry over value */
1468                     scratch[CDP_val].u_val = pdp_temp_val;
1469                     break;
1470             case CF_LAST:
1471             default:
1472                     scratch[CDP_primary_val].u_val = pdp_temp_val;
1473                     /* initialize carry over value */
1474                     scratch[CDP_val].u_val = pdp_temp_val;
1475                     break;
1476     }
1477 }
1478
1479 /*
1480  * Update the consolidation function for Holt-Winters functions as
1481  * well as other functions that don't actually consolidate multiple
1482  * PDPs.
1483  */
1484 static void reset_cdp(
1485     rrd_t *rrd,
1486     unsigned long elapsed_pdp_st, 
1487     rrd_value_t *pdp_temp,
1488     rrd_value_t *last_seasonal_coef,
1489     rrd_value_t *seasonal_coef,
1490     int rra_idx, int ds_idx, int cdp_idx, 
1491     enum cf_en current_cf)
1492 {
1493     unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1494
1495     switch (current_cf) {
1496             case CF_AVERAGE:
1497             default:
1498                     scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1499                     scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1500                     break;
1501             case CF_SEASONAL:
1502             case CF_DEVSEASONAL:
1503                     /* need to update cached seasonal values, so they are consistent
1504                      * with the bulk update */
1505                     /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1506                      * CDP_last_deviation are the same. */
1507                     scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1508                     scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1509                     break;
1510             case CF_HWPREDICT:
1511             case CF_MHWPREDICT:
1512                     /* need to update the null_count and last_null_count.
1513                      * even do this for non-DNAN pdp_temp because the
1514                      * algorithm is not learning from batch updates. */
1515                     scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1516                     scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1517                     /* fall through */
1518             case CF_DEVPREDICT:
1519                     scratch[CDP_primary_val].u_val = DNAN;
1520                     scratch[CDP_secondary_val].u_val = DNAN;
1521                     break;
1522             case CF_FAILURES:
1523                     /* do not count missed bulk values as failures */
1524                     scratch[CDP_primary_val].u_val = 0;
1525                     scratch[CDP_secondary_val].u_val = 0;
1526                     /* need to reset violations buffer.
1527                      * could do this more carefully, but for now, just
1528                      * assume a bulk update wipes away all violations. */
1529                     erase_violations(rrd, cdp_idx, rra_idx);
1530                     break;
1531     }
1532 }
1533
1534 static rrd_value_t initialize_average_carry_over(
1535     rrd_value_t pdp_temp_val, 
1536     unsigned long elapsed_pdp_st,
1537     unsigned long start_pdp_offset,
1538     unsigned long pdp_cnt)
1539 {
1540     /* initialize carry over value */
1541     if (isnan(pdp_temp_val)) {
1542         return DNAN;
1543     } 
1544     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1545 }
1546
1547 /*
1548  * Update or initialize a CDP value based on the consolidation
1549  * function.
1550  *
1551  * Returns the new value.
1552  */
1553 static rrd_value_t calculate_cdp_val(
1554     rrd_value_t cdp_val,
1555     rrd_value_t pdp_temp_val,
1556     unsigned long elapsed_pdp_st,
1557     int current_cf, int i, int ii)
1558 {
1559     if (isnan(cdp_val)) {
1560         if (current_cf == CF_AVERAGE) {
1561             pdp_temp_val *= elapsed_pdp_st;
1562         }
1563 #ifdef DEBUG
1564         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1565                         i, ii, pdp_temp_val);
1566 #endif
1567         return pdp_temp_val;
1568     } 
1569     if (current_cf == CF_AVERAGE) 
1570         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1571     if (current_cf == CF_MINIMUM)
1572         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1573     if (current_cf == CF_MAXIMUM)
1574         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1575
1576     return pdp_temp_val;
1577 }
1578
1579 /*
1580  * For each RRA, update the seasonal values and then call update_aberrant_CF
1581  * for each data source.
1582  *
1583  * Return 0 on success, -1 on error.
1584  */
1585 static int update_aberrant_cdps(
1586     rrd_t *rrd, rrd_file_t *rrd_file, unsigned long rra_begin,
1587     unsigned long *rra_current, unsigned long elapsed_pdp_st, 
1588     rrd_value_t *pdp_temp, rrd_value_t  **seasonal_coef) 
1589 {
1590     unsigned long rra_idx, ds_idx, j;
1591
1592     /* number of PDP steps since the last update that
1593      * are assigned to the first CDP to be generated
1594      * since the last update. */
1595     unsigned short scratch_idx;
1596     unsigned long  rra_start;
1597     enum cf_en     current_cf;
1598
1599     /* this loop is only entered if elapsed_pdp_st < 3 */
1600     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1601                     j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1602         rra_start = rra_begin;
1603         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1604             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1605                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1606                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1607                     if (scratch_idx == CDP_primary_val) {
1608                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1609                                         elapsed_pdp_st + 1, seasonal_coef);
1610                     } else {
1611                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1612                                         elapsed_pdp_st + 2, seasonal_coef);
1613                     }
1614                     *rra_current = rrd_tell(rrd_file);
1615                 }
1616                 if (rrd_test_error())
1617                     return -1;
1618                 /* loop over data soures within each RRA */
1619                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1620                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1621                                     rra_idx * (rrd->stat_head->ds_cnt) + ds_idx,
1622                                     rra_idx, ds_idx, scratch_idx, *seasonal_coef);
1623                 }
1624             }
1625             rra_start += rrd->rra_def[rra_idx].row_cnt 
1626                     * rrd->stat_head->ds_cnt 
1627                     * sizeof(rrd_value_t);
1628         }
1629     }
1630     return 0;
1631 }
1632
1633 /* 
1634  * Move sequentially through the file, writing one RRA at a time.  Note this
1635  * architecture divorces the computation of CDP with flushing updated RRA
1636  * entries to disk.
1637  *
1638  * Return 0 on success, -1 on error.
1639  */
1640 static int write_to_rras(
1641     rrd_t *rrd, 
1642     rrd_file_t *rrd_file, 
1643     unsigned long *rra_step_cnt,
1644     unsigned long rra_begin, 
1645     unsigned long *rra_current,
1646     time_t current_time,
1647     info_t **pcdp_summary)
1648 {
1649     unsigned long rra_idx;
1650     unsigned long rra_start;    
1651     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1652     /* number of PDP steps since the last update that
1653      * are assigned to the first CDP to be generated
1654      * since the last update. */
1655     unsigned short scratch_idx;
1656     time_t    rra_time = 0; /* time of update for a RRA */
1657
1658     /* Ready to write to disk */
1659     rra_start = rra_begin;
1660     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1661         /* skip unless there's something to write */
1662         if (rra_step_cnt[rra_idx]) {
1663             /* write the first row */
1664 #ifdef DEBUG
1665             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1666 #endif
1667             rrd->rra_ptr[rra_idx].cur_row++;
1668             if (rrd->rra_ptr[rra_idx].cur_row >= rrd->rra_def[rra_idx].row_cnt)
1669                 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1670             /* positition on the first row */
1671             rra_pos_tmp = rra_start +
1672                     (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1673                     sizeof(rrd_value_t);
1674             if (rra_pos_tmp != *rra_current) {
1675                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1676                     rrd_set_error("seek error in rrd");
1677                     return -1;
1678                 }
1679                 *rra_current = rra_pos_tmp;
1680             }
1681 #ifdef DEBUG
1682             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1683 #endif
1684             scratch_idx = CDP_primary_val;
1685             if (*pcdp_summary != NULL) {
1686                 rra_time = (current_time - current_time
1687                                 % (rrd->rra_def[rra_idx].pdp_cnt *
1688                                         rrd->stat_head->pdp_step))
1689                         - ((rra_step_cnt[rra_idx] - 1) * rrd->rra_def[rra_idx].pdp_cnt *
1690                                         rrd->stat_head->pdp_step);
1691             }
1692             if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, scratch_idx, 
1693                                     pcdp_summary, &rra_time) == -1)
1694                 return -1;
1695             if (rrd_test_error())
1696                 return -1;
1697
1698             /* write other rows of the bulk update, if any */
1699             scratch_idx = CDP_secondary_val;
1700             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1701                 if (++rrd->rra_ptr[rra_idx].cur_row == rrd->rra_def[rra_idx].row_cnt) {
1702 #ifdef DEBUG
1703                     fprintf(stderr,
1704                                     "Wraparound for RRA %s, %lu updates left\n",
1705                                     rrd->rra_def[rra_idx].cf_nam, rra_step_cnt[rra_idx] - 1);
1706 #endif
1707                     /* wrap */
1708                     rrd->rra_ptr[rra_idx].cur_row = 0;
1709                     /* seek back to beginning of current rra */
1710                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1711                         rrd_set_error("seek error in rrd");
1712                         return -1;
1713                     }
1714 #ifdef DEBUG
1715                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1716                                     rrd_file->pos);
1717 #endif
1718                     *rra_current = rra_start;
1719                 }
1720                 if (*pcdp_summary != NULL) {
1721                     rra_time = (current_time - current_time
1722                                     % (rrd->rra_def[rra_idx].pdp_cnt *
1723                                             rrd->stat_head->pdp_step))
1724                             -
1725                             ((rra_step_cnt[rra_idx] - 2) * rrd->rra_def[rra_idx].pdp_cnt *
1726                              rrd->stat_head->pdp_step);
1727                 }
1728                 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1729                                         scratch_idx, pcdp_summary, &rra_time) == -1)
1730                     return -1;
1731             }
1732
1733             if (rrd_test_error())
1734                 return -1;
1735         }
1736         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1737                 sizeof(rrd_value_t);
1738     }           /* RRA LOOP */
1739
1740     return 0;
1741 }
1742
1743 /*
1744  * Write out one row of values (one value per DS) to the archive.
1745  *
1746  * Returns 0 on success, -1 on error.
1747  */
1748 static int write_RRA_row(
1749     rrd_file_t *rrd_file,
1750     rrd_t *rrd,
1751     unsigned long rra_idx,
1752     unsigned long *rra_current,
1753     unsigned short CDP_scratch_idx,
1754     info_t **pcdp_summary,
1755     time_t *rra_time)
1756 {
1757     unsigned long ds_idx, cdp_idx;
1758     infoval   iv;
1759
1760     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1761         /* compute the cdp index */
1762         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1763 #ifdef DEBUG
1764         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1765                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1766                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1767 #endif
1768         if (pcdp_summary != NULL) {
1769             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1770             /* append info to the return hash */
1771             *pcdp_summary = info_push(*pcdp_summary,
1772                      sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", *rra_time,
1773                            rrd->rra_def[rra_idx].cf_nam,
1774                            rrd->rra_def[rra_idx].pdp_cnt,
1775                            rrd->ds_def[ds_idx].ds_nam), RD_I_VAL, iv);
1776         }
1777         if (rrd_write(rrd_file,
1778              &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1779              sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1780             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1781             return -1;
1782         }
1783         *rra_current += sizeof(rrd_value_t);
1784     }
1785     return 0;
1786 }
1787
1788 /*
1789  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1790  *
1791  * Returns 0 on success, -1 otherwise
1792  */
1793 static int smooth_all_rras(
1794     rrd_t *rrd, 
1795     rrd_file_t *rrd_file, 
1796     unsigned long rra_begin) 
1797 {
1798     unsigned long rra_start = rra_begin;
1799     unsigned long rra_idx;
1800     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
1801         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
1802                         cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
1803 #ifdef DEBUG
1804             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
1805 #endif
1806             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
1807             if (rrd_test_error())
1808                 return -1;
1809         }
1810         rra_start += rrd->rra_def[rra_idx].row_cnt
1811                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1812     }
1813     return 0;
1814 }
1815
1816 #ifndef HAVE_MMAP
1817 /*
1818  * Flush changes to disk (unless we're using mmap)
1819  *
1820  * Returns 0 on success, -1 otherwise
1821  */
1822 static int write_changes_to_disk(
1823     rrd_t *rrd, rrd_file_t *rrd_file, int version) 
1824 {
1825     /* we just need to write back the live header portion now*/
1826     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1827                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
1828                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
1829                  SEEK_SET) != 0) {
1830         rrd_set_error("seek rrd for live header writeback");
1831         return -1;
1832     }
1833     if (version >= 3) {
1834         if (rrd_write(rrd_file, rrd->live_head,
1835                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1836             rrd_set_error("rrd_write live_head to rrd");
1837             return -1;
1838         }
1839     } else {
1840         if (rrd_write(rrd_file, &rrd->live_head->last_up,
1841                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1842             rrd_set_error("rrd_write live_head to rrd");
1843             return -1;
1844         }
1845     }
1846
1847
1848     if (rrd_write(rrd_file, rrd->pdp_prep,
1849                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
1850         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
1851         rrd_set_error("rrd_write pdp_prep to rrd");
1852         return -1;
1853     }
1854
1855     if (rrd_write(rrd_file, rrd->cdp_prep,
1856                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
1857                   rrd->stat_head->ds_cnt)
1858         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
1859                       rrd->stat_head->ds_cnt)) {
1860
1861         rrd_set_error("rrd_write cdp_prep to rrd");
1862         return -1;
1863     }
1864
1865     if (rrd_write(rrd_file, rrd->rra_ptr,
1866                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
1867         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
1868         rrd_set_error("rrd_write rra_ptr to rrd");
1869         return -1;
1870     }
1871     return 0;
1872 }
1873 #endif