d015cb21ed3c551aa6d5589faf0fe929fefe3e68
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.99907080300  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
16
17 #include <locale.h>
18
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
21
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
24
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
28  * replacement.
29  */
30 #include <sys/timeb.h>
31
32 #ifndef __MINGW32__
33 struct timeval {
34     time_t    tv_sec;   /* seconds */
35     long      tv_usec;  /* microseconds */
36 };
37 #endif
38
39 struct __timezone {
40     int       tz_minuteswest;   /* minutes W of Greenwich */
41     int       tz_dsttime;   /* type of dst correction */
42 };
43
44 static int gettimeofday(
45     struct timeval *t,
46     struct __timezone *tz)
47 {
48
49     struct _timeb current_time;
50
51     _ftime(&current_time);
52
53     t->tv_sec = current_time.time;
54     t->tv_usec = current_time.millitm * 1000;
55
56     return 0;
57 }
58
59 #endif
60
61 /* FUNCTION PROTOTYPES */
62
63 int       rrd_update_r(
64     const char *filename,
65     const char *tmplt,
66     int argc,
67     const char **argv);
68 int       _rrd_update(
69     const char *filename,
70     const char *tmplt,
71     int argc,
72     const char **argv,
73     info_t *);
74
75 static int allocate_data_structures(
76     rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp, const char *tmplt, 
77     long **tmpl_idx, unsigned long *tmpl_cnt, unsigned long **rra_step_cnt, 
78     unsigned long **skip_update, rrd_value_t **pdp_new);
79
80 static int parse_template(rrd_t *rrd, const char *tmplt,
81     unsigned long *tmpl_cnt, long *tmpl_idx);
82
83 static int process_arg(
84     char          *step_start, 
85     rrd_t         *rrd, 
86     rrd_file_t    *rrd_file, 
87     unsigned long  rra_begin,
88     unsigned long *rra_current,
89     time_t        *current_time,
90     unsigned long *current_time_usec,
91     rrd_value_t   *pdp_temp,
92     rrd_value_t   *pdp_new,
93     unsigned long *rra_step_cnt,
94     char         **updvals,
95     long          *tmpl_idx,
96     unsigned long  tmpl_cnt,
97     info_t       **pcdp_summary,
98     int            version,
99     unsigned long *skip_update,
100     int           *schedule_smooth);
101
102 static int parse_ds(rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
103     unsigned long tmpl_cnt, time_t *current_time, unsigned long *current_time_usec, 
104     int version);
105
106 static int get_time_from_reading(rrd_t *rrd, char timesyntax, char **updvals, 
107     time_t *current_time, unsigned long *current_time_usec, int version);
108
109 static int update_pdp_prep(rrd_t *rrd, char **updvals, 
110     rrd_value_t *pdp_new, double interval);
111
112 static int calculate_elapsed_steps(rrd_t *rrd, 
113     unsigned long current_time, unsigned long current_time_usec,
114     double interval, double *pre_int, double *post_int, 
115     unsigned long *proc_pdp_cnt);
116
117 static void simple_update(rrd_t *rrd, double interval, rrd_value_t *pdp_new);
118
119 static int process_all_pdp_st(rrd_t *rrd, double interval, 
120     double pre_int, double post_int, unsigned long elapsed_pdp_st, 
121     rrd_value_t *pdp_new, rrd_value_t *pdp_temp);
122
123 static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval, 
124     double pre_int, double post_int, long diff_pdp_st, rrd_value_t *pdp_new, 
125     rrd_value_t *pdp_temp);
126
127 static int update_all_cdp_prep(
128     rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin, 
129     rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
130     rrd_value_t **last_seasonal_coef, rrd_value_t **seasonal_coef,
131     rrd_value_t *pdp_temp, unsigned long *rra_current, 
132     unsigned long *skip_update, int *schedule_smooth);
133
134 static int do_schedule_smooth(rrd_t *rrd, unsigned long rra_idx, 
135     unsigned long elapsed_pdp_st);
136
137 static int update_cdp_prep(rrd_t *rrd, unsigned long elapsed_pdp_st, 
138     unsigned long start_pdp_offset, unsigned long *rra_step_cnt, 
139     int rra_idx, rrd_value_t *pdp_temp, rrd_value_t *last_seasonal_coef, 
140     rrd_value_t *seasonal_coef, int current_cf);
141
142 static void update_cdp(unival *scratch, int current_cf, 
143     rrd_value_t pdp_temp_val, unsigned long rra_step_cnt, 
144     unsigned long elapsed_pdp_st, unsigned long start_pdp_offset, 
145     unsigned long pdp_cnt, rrd_value_t xff, int i, int ii);
146
147 static void initialize_cdp_val(unival *scratch, int current_cf,
148     rrd_value_t pdp_temp_val, unsigned long elapsed_pdp_st, 
149     unsigned long start_pdp_offset, unsigned long pdp_cnt);
150
151 static void reset_cdp(rrd_t *rrd, unsigned long elapsed_pdp_st, 
152     rrd_value_t *pdp_temp, rrd_value_t *last_seasonal_coef, 
153     rrd_value_t *seasonal_coef, 
154     int rra_idx, int ds_idx, int cdp_idx, enum cf_en current_cf);
155
156 static rrd_value_t initialize_average_carry_over(rrd_value_t pdp_temp_val,
157     unsigned long elapsed_pdp_st, unsigned long start_pdp_offset, 
158     unsigned long pdp_cnt);
159
160 static rrd_value_t calculate_cdp_val(
161     rrd_value_t cdp_val, rrd_value_t pdp_temp_val,
162     unsigned long elapsed_pdp_st, int current_cf, int i, int ii);
163
164 static int update_aberrant_cdps(rrd_t *rrd, rrd_file_t *rrd_file, 
165     unsigned long rra_begin, unsigned long *rra_current, 
166     unsigned long elapsed_pdp_st, rrd_value_t *pdp_temp, rrd_value_t **seasonal_coef);
167
168 static int write_to_rras(rrd_t *rrd, rrd_file_t *rrd_file, 
169     unsigned long *rra_step_cnt, unsigned long rra_begin, 
170     unsigned long *rra_current, time_t current_time, 
171     unsigned long *skip_update, info_t **pcdp_summary);
172
173 static int write_RRA_row(rrd_file_t *rrd_file, rrd_t *rrd, unsigned long rra_idx,
174     unsigned long *rra_current, unsigned short CDP_scratch_idx, info_t **pcdp_summary,
175     time_t rra_time);
176
177 static int smooth_all_rras(rrd_t *rrd, rrd_file_t *rrd_file, 
178     unsigned long rra_begin);
179
180 #ifndef HAVE_MMAP
181 static int write_changes_to_disk(rrd_t *rrd, rrd_file_t *rrd_file, 
182     int version);
183 #endif
184
185 /*
186  * normalize time as returned by gettimeofday. usec part must
187  * be always >= 0
188  */
189 static inline void normalize_time(
190     struct timeval *t)
191 {
192     if (t->tv_usec < 0) {
193         t->tv_sec--;
194         t->tv_usec += 1e6L;
195     }
196 }
197
198 /*
199  * Sets current_time and current_time_usec based on the current time.
200  * current_time_usec is set to 0 if the version number is 1 or 2.
201  */
202 static inline void initialize_time(
203     time_t *current_time, unsigned long *current_time_usec,
204     int version) 
205 {
206     struct timeval tmp_time;    /* used for time conversion */
207
208     gettimeofday(&tmp_time, 0);
209     normalize_time(&tmp_time);
210     *current_time = tmp_time.tv_sec;
211     if (version >= 3) {
212         *current_time_usec = tmp_time.tv_usec;
213     } else {
214         *current_time_usec = 0;
215     }
216 }
217
218 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
219
220 info_t   *rrd_update_v(
221     int argc,
222     char **argv)
223 {
224     char     *tmplt = NULL;
225     info_t   *result = NULL;
226     infoval   rc;
227     struct option long_options[] = {
228         {"template", required_argument, 0, 't'},
229         {0, 0, 0, 0}
230     };
231
232     rc.u_int = -1;
233     optind = 0;
234     opterr = 0;         /* initialize getopt */
235
236     while (1) {
237         int       option_index = 0;
238         int       opt;
239
240         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
241
242         if (opt == EOF)
243             break;
244
245         switch (opt) {
246         case 't':
247             tmplt = optarg;
248             break;
249
250         case '?':
251             rrd_set_error("unknown option '%s'", argv[optind - 1]);
252             goto end_tag;
253         }
254     }
255
256     /* need at least 2 arguments: filename, data. */
257     if (argc - optind < 2) {
258         rrd_set_error("Not enough arguments");
259         goto end_tag;
260     }
261     rc.u_int = 0;
262     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
263     rc.u_int = _rrd_update(argv[optind], tmplt,
264                            argc - optind - 1,
265                            (const char **) (argv + optind + 1), result);
266     result->value.u_int = rc.u_int;
267   end_tag:
268     return result;
269 }
270
271 int rrd_update(
272     int argc,
273     char **argv)
274 {
275     struct option long_options[] = {
276         {"template", required_argument, 0, 't'},
277         {0, 0, 0, 0}
278     };
279     int       option_index = 0;
280     int       opt;
281     char     *tmplt = NULL;
282     int       rc = -1;
283
284     optind = 0;
285     opterr = 0;         /* initialize getopt */
286
287     while (1) {
288         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
289
290         if (opt == EOF)
291             break;
292
293         switch (opt) {
294         case 't':
295             tmplt = strdup(optarg);
296             break;
297
298         case '?':
299             rrd_set_error("unknown option '%s'", argv[optind - 1]);
300             goto out;
301         }
302     }
303
304     /* need at least 2 arguments: filename, data. */
305     if (argc - optind < 2) {
306         rrd_set_error("Not enough arguments");
307         goto out;
308     }
309
310     rc = rrd_update_r(argv[optind], tmplt,
311                       argc - optind - 1, (const char **) (argv + optind + 1));
312 out:
313     free(tmplt);
314     return rc;
315 }
316
317 int rrd_update_r(
318     const char *filename,
319     const char *tmplt,
320     int argc,
321     const char **argv)
322 {
323     return _rrd_update(filename, tmplt, argc, argv, NULL);
324 }
325
326 int _rrd_update(
327     const char *filename,
328     const char *tmplt,
329     int argc,
330     const char **argv,
331     info_t *pcdp_summary)
332 {
333
334     int           arg_i = 2;
335
336     unsigned long rra_begin;    /* byte pointer to the rra
337                                  * area in the rrd file.  this
338                                  * pointer never changes value */
339     unsigned long rra_current;  /* byte pointer to the current write
340                                  * spot in the rrd file. */
341     rrd_value_t   *pdp_new;     /* prepare the incoming data to be added 
342                                  * to the existing entry */
343     rrd_value_t   *pdp_temp;    /* prepare the pdp values to be added 
344                                  * to the cdp values */
345
346     long          *tmpl_idx;    /* index representing the settings
347                                  * transported by the tmplt index */
348     unsigned long tmpl_cnt = 2; /* time and data */
349     rrd_t         rrd;
350     time_t        current_time = 0;
351     unsigned long current_time_usec = 0;    /* microseconds part of current time */
352     char        **updvals;
353     int           schedule_smooth = 0;
354
355     /* number of elapsed PDP steps since last update */
356     unsigned long *rra_step_cnt = NULL;
357
358     int            version;     /* rrd version */
359     rrd_file_t    *rrd_file;
360     char          *arg_copy;    /* for processing the argv */
361     unsigned long *skip_update; /* RRAs to advance but not write */
362
363     /* need at least 1 arguments: data. */
364     if (argc < 1) {
365         rrd_set_error("Not enough arguments");
366         goto err_out;
367     }
368
369     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
370         goto err_free;
371     }
372     /* We are now at the beginning of the rra's */
373     rra_current = rra_begin = rrd_file->header_len;
374
375     version = atoi(rrd.stat_head->version);
376
377     initialize_time(&current_time, &current_time_usec, version);
378
379     /* get exclusive lock to whole file.
380      * lock gets removed when we close the file.
381      */
382     if (LockRRD(rrd_file->fd) != 0) {
383         rrd_set_error("could not lock RRD");
384         goto err_close;
385     }
386
387     if (allocate_data_structures(&rrd, &updvals, 
388                     &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
389                     &rra_step_cnt, &skip_update, &pdp_new) == -1) {
390         goto err_close;
391     }
392
393     /* loop through the arguments. */
394     for (arg_i = 0; arg_i < argc; arg_i++) {
395         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
396             rrd_set_error("failed duplication argv entry");
397             break;
398         }
399         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current, 
400                         &current_time, &current_time_usec, pdp_temp, pdp_new,
401                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt, &pcdp_summary, 
402                         version, skip_update, &schedule_smooth) == -1) {
403             free(arg_copy);
404             break;
405         }
406         free(arg_copy);
407     }
408
409     free(rra_step_cnt);
410
411     /* if we got here and if there is an error and if the file has not been
412      * written to, then close things up and return. */
413     if (rrd_test_error()) {
414         goto err_free_structures;
415     }
416
417 #ifndef HAVE_MMAP
418     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
419         goto err_free_structures;
420     }
421 #endif
422
423     /* calling the smoothing code here guarantees at most one smoothing
424      * operation per rrd_update call. Unfortunately, it is possible with bulk
425      * updates, or a long-delayed update for smoothing to occur off-schedule.
426      * This really isn't critical except during the burn-in cycles. */
427     if (schedule_smooth) {
428         smooth_all_rras(&rrd, rrd_file, rra_begin);
429     }
430
431 /*    rrd_dontneed(rrd_file,&rrd); */
432     rrd_free(&rrd);
433     rrd_close(rrd_file);
434
435     free(pdp_new);
436     free(tmpl_idx);
437     free(pdp_temp);
438     free(skip_update);
439     free(updvals);
440     return 0;
441
442   err_free_structures:
443     free(pdp_new);
444     free(tmpl_idx);
445     free(pdp_temp);
446     free(skip_update);
447     free(updvals);
448   err_close:
449     rrd_close(rrd_file);
450   err_free:
451     rrd_free(&rrd);
452   err_out:
453     return -1;
454 }
455
456 /*
457  * get exclusive lock to whole file.
458  * lock gets removed when we close the file
459  *
460  * returns 0 on success
461  */
462 int LockRRD(
463     int in_file)
464 {
465     int       rcstat;
466
467     {
468 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
469         struct _stat st;
470
471         if (_fstat(in_file, &st) == 0) {
472             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
473         } else {
474             rcstat = -1;
475         }
476 #else
477         struct flock lock;
478
479         lock.l_type = F_WRLCK;  /* exclusive write lock */
480         lock.l_len = 0; /* whole file */
481         lock.l_start = 0;   /* start of file */
482         lock.l_whence = SEEK_SET;   /* end of file */
483
484         rcstat = fcntl(in_file, F_SETLK, &lock);
485 #endif
486     }
487
488     return (rcstat);
489 }
490
491 /*
492  * Allocate some important arrays used, and initialize the template.
493  *
494  * When it returns, either all of the structures are allocated
495  * or none of them are.
496  *
497  * Returns 0 on success, -1 on error.
498  */
499 static int allocate_data_structures(
500     rrd_t         *rrd, 
501     char        ***updvals, 
502     rrd_value_t  **pdp_temp, 
503     const char    *tmplt, 
504     long         **tmpl_idx, 
505     unsigned long *tmpl_cnt, 
506     unsigned long **rra_step_cnt, 
507     unsigned long **skip_update,
508     rrd_value_t  **pdp_new) 
509 {
510     unsigned i, ii;
511     if ((*updvals = (char **)malloc(sizeof(char *) 
512                            * (rrd->stat_head->ds_cnt + 1))) == NULL) {
513         rrd_set_error("allocating updvals pointer array.");
514         return -1;
515     }
516     if ((*pdp_temp = (rrd_value_t *)malloc(sizeof(rrd_value_t)
517                            * rrd->stat_head->ds_cnt)) == NULL) {
518         rrd_set_error("allocating pdp_temp.");
519         goto err_free_updvals;
520     }
521     if ((*skip_update = (unsigned long *)malloc(sizeof(unsigned long)
522                            * rrd->stat_head->rra_cnt)) == NULL) {
523         rrd_set_error("allocating skip_update.");
524         goto err_free_pdp_temp;
525     }
526     if ((*tmpl_idx = (long *)malloc(sizeof(unsigned long)
527                            * (rrd->stat_head->ds_cnt + 1))) == NULL) {
528         rrd_set_error("allocating tmpl_idx.");
529         goto err_free_skip_update;
530     }
531     if ((*rra_step_cnt = (unsigned long *)malloc(sizeof(unsigned long) 
532                            * (rrd->stat_head->rra_cnt))) == NULL) {
533         rrd_set_error("allocating rra_step_cnt.");
534         goto err_free_tmpl_idx;
535     }
536
537     /* initialize tmplt redirector */
538     /* default config example (assume DS 1 is a CDEF DS)
539        tmpl_idx[0] -> 0; (time)
540        tmpl_idx[1] -> 1; (DS 0)
541        tmpl_idx[2] -> 3; (DS 2)
542        tmpl_idx[3] -> 4; (DS 3) */
543     (*tmpl_idx)[0] = 0;    /* time */
544     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
545         if (dst_conv(rrd->ds_def[i-1].dst) != DST_CDEF)
546             (*tmpl_idx)[ii++] = i;
547     }
548     *tmpl_cnt = ii;
549
550     if (tmplt != NULL) {
551         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
552             goto err_free_rra_step_cnt;
553         }
554     }
555
556     if ((*pdp_new = (rrd_value_t *)malloc(sizeof(rrd_value_t)
557                           * rrd->stat_head->ds_cnt)) == NULL) {
558         rrd_set_error("allocating pdp_new.");
559         goto err_free_rra_step_cnt;
560     }
561
562     return 0;
563
564 err_free_rra_step_cnt:
565     free(*rra_step_cnt);
566 err_free_tmpl_idx:
567     free(*tmpl_idx);
568 err_free_skip_update:
569     free(*skip_update);
570 err_free_pdp_temp:
571     free(*pdp_temp);
572 err_free_updvals:
573     free(*updvals);
574     return -1;
575 }
576
577 /*
578  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
579  *
580  * Returns 0 on success.
581  */
582 static int parse_template(
583     rrd_t *rrd, const char *tmplt, 
584     unsigned long *tmpl_cnt, long *tmpl_idx)
585 {
586     char          *dsname, *tmplt_copy;
587     unsigned int   tmpl_len, i;
588     int ret = 0;
589
590     *tmpl_cnt = 1;   /* the first entry is the time */
591
592     /* we should work on a writeable copy here */
593     if ((tmplt_copy = strdup(tmplt)) == NULL) {
594         rrd_set_error("error copying tmplt '%s'", tmplt);
595         ret = -1;
596         goto out;
597     }
598
599     dsname = tmplt_copy;
600     tmpl_len = strlen(tmplt_copy);
601     for (i = 0; i <= tmpl_len; i++) {
602         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
603             tmplt_copy[i] = '\0';
604             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
605                 rrd_set_error("tmplt contains more DS definitions than RRD");
606                 ret = -1;
607                 goto out_free_tmpl_copy;
608             }
609             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname)+1) == 0) {
610                 rrd_set_error("unknown DS name '%s'", dsname);
611                 ret = -1;
612                 goto out_free_tmpl_copy;
613             } 
614             /* go to the next entry on the tmplt_copy */
615             if (i < tmpl_len)
616                 dsname = &tmplt_copy[i+1];
617         }
618     }
619 out_free_tmpl_copy:
620     free(tmplt_copy);
621 out:
622     return ret;
623 }
624
625 /*
626  * Parse an update string, updates the primary data points (PDPs)
627  * and consolidated data points (CDPs), and writes changes to the RRAs.
628  *
629  * Returns 0 on success, -1 on error.
630  */
631 static int process_arg(
632     char           *step_start, 
633     rrd_t          *rrd, 
634     rrd_file_t     *rrd_file, 
635     unsigned long  rra_begin,
636     unsigned long *rra_current,
637     time_t        *current_time,
638     unsigned long *current_time_usec,
639     rrd_value_t   *pdp_temp,
640     rrd_value_t   *pdp_new,
641     unsigned long *rra_step_cnt,
642     char         **updvals,
643     long          *tmpl_idx,
644     unsigned long  tmpl_cnt,
645     info_t       **pcdp_summary,
646     int            version,
647     unsigned long *skip_update,
648     int           *schedule_smooth)
649 {
650     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
651
652     /* a vector of future Holt-Winters seasonal coefs */
653     unsigned long elapsed_pdp_st;
654
655     double    interval, pre_int, post_int;  /* interval between this and
656                                              * the last run */
657     unsigned long proc_pdp_cnt;
658     unsigned long rra_start;
659
660     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt, 
661             current_time, current_time_usec, version) == -1) {
662         return -1;
663     }
664     /* seek to the beginning of the rra's */
665     if (*rra_current != rra_begin) {
666 #ifndef HAVE_MMAP
667         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
668             rrd_set_error("seek error in rrd");
669             return -1;
670         }
671 #endif
672         *rra_current = rra_begin;
673     }
674     rra_start = rra_begin;
675
676     interval = (double) (*current_time - rrd->live_head->last_up)
677             + (double) ((long) *current_time_usec -
678                             (long) rrd->live_head->last_up_usec) / 1e6f;
679
680     /* process the data sources and update the pdp_prep 
681      * area accordingly */
682     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
683         return -1;
684     }
685
686     elapsed_pdp_st = calculate_elapsed_steps(rrd, 
687                     *current_time, *current_time_usec, 
688                     interval, &pre_int, &post_int,
689                     &proc_pdp_cnt);
690
691     /* has a pdp_st moment occurred since the last run ? */
692     if (elapsed_pdp_st == 0) {
693         /* no we have not passed a pdp_st moment. therefore update is simple */
694         simple_update(rrd, interval, pdp_new);
695     } else {
696         /* an pdp_st has occurred. */
697         if (process_all_pdp_st(rrd, interval, 
698                                pre_int, post_int,
699                                elapsed_pdp_st, 
700                                pdp_new, pdp_temp) == -1) 
701         {
702             return -1;
703         }
704         if (update_all_cdp_prep(rrd, rra_step_cnt, 
705                                 rra_begin, rrd_file,
706                                 elapsed_pdp_st,
707                                 proc_pdp_cnt,
708                                 &last_seasonal_coef,
709                                 &seasonal_coef,
710                                 pdp_temp, rra_current,
711                                 skip_update, schedule_smooth) == -1)
712         {
713             goto err_free_coefficients;
714         }
715         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current, 
716                 elapsed_pdp_st, pdp_temp, &seasonal_coef) == -1)
717         {
718             goto err_free_coefficients;
719         }
720         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin, 
721                 rra_current, *current_time, skip_update, pcdp_summary) == -1)
722         {
723             goto err_free_coefficients;
724         }
725     }               /* endif a pdp_st has occurred */
726     rrd->live_head->last_up = *current_time;
727     rrd->live_head->last_up_usec = *current_time_usec;
728
729     free(seasonal_coef);
730     free(last_seasonal_coef);
731     return 0;
732
733 err_free_coefficients:
734     free(seasonal_coef);
735     free(last_seasonal_coef);
736     return -1;
737 }
738
739 /*
740  * Parse a DS string (time + colon-separated values), storing the
741  * results in current_time, current_time_usec, and updvals.
742  *
743  * Returns 0 on success, -1 on error.
744  */
745 static int parse_ds(
746     rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
747     unsigned long tmpl_cnt, time_t *current_time,
748     unsigned long *current_time_usec, int version)
749 {
750     char *p;
751     unsigned long i;
752     char timesyntax;
753
754     updvals[0] = input;
755     /* initialize all ds input to unknown except the first one
756        which has always got to be set */
757     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
758         updvals[i] = "U";
759
760     /* separate all ds elements; first must be examined separately
761        due to alternate time syntax */
762     if ((p = strchr(input, '@')) != NULL) {
763         timesyntax = '@';
764     } else if ((p = strchr(input, ':')) != NULL) {
765         timesyntax = ':';
766     } else {
767         rrd_set_error("expected timestamp not found in data source from %s",
768                  input);
769         return -1;
770     }
771     *p = '\0';
772     i = 1;
773     updvals[tmpl_idx[i++]] = p+1;
774     while (*(++p)) {
775         if (*p == ':') {
776             *p = '\0';
777             if (i < tmpl_cnt) {
778                 updvals[tmpl_idx[i++]] = p+1;
779             }
780         }
781     }
782
783     if (i != tmpl_cnt) {
784         rrd_set_error("expected %lu data source readings (got %lu) from %s",
785                  tmpl_cnt - 1, i, input);
786         return -1;
787     }
788
789     if (get_time_from_reading(rrd, timesyntax, updvals, 
790                             current_time, current_time_usec, 
791                             version) == -1) {
792         return -1;
793     }
794     return 0;
795 }
796
797 /*
798  * Parse the time in a DS string, store it in current_time and 
799  * current_time_usec and verify that it's later than the last
800  * update for this DS.
801  *
802  * Returns 0 on success, -1 on error.
803  */
804 static int get_time_from_reading(
805     rrd_t *rrd, char timesyntax, char **updvals, 
806     time_t *current_time, unsigned long *current_time_usec,
807     int version)
808 {
809     double    tmp;
810     char     *parsetime_error = NULL;
811     char     *old_locale;
812     struct rrd_time_value ds_tv;
813     struct timeval tmp_time;    /* used for time conversion */
814
815     /* get the time from the reading ... handle N */
816     if (timesyntax == '@') { /* at-style */
817         if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
818             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
819             return -1;
820         }
821         if (ds_tv.type == RELATIVE_TO_END_TIME ||
822                         ds_tv.type == RELATIVE_TO_START_TIME) {
823             rrd_set_error("specifying time relative to the 'start' "
824                             "or 'end' makes no sense here: %s", updvals[0]);
825             return -1;
826         }
827         *current_time = mktime(&ds_tv.tm) + ds_tv.offset;
828         *current_time_usec = 0;  /* FIXME: how to handle usecs here ? */
829     } else if (strcmp(updvals[0], "N") == 0) {
830         gettimeofday(&tmp_time, 0);
831         normalize_time(&tmp_time);
832         *current_time = tmp_time.tv_sec;
833         *current_time_usec = tmp_time.tv_usec;
834     } else {
835         old_locale = setlocale(LC_NUMERIC, "C");
836         tmp = strtod(updvals[0], 0);
837         setlocale(LC_NUMERIC, old_locale);
838         *current_time = floor(tmp);
839         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
840     }
841     /* dont do any correction for old version RRDs */
842     if (version < 3)
843         *current_time_usec = 0;
844
845     if (*current_time < rrd->live_head->last_up ||
846                     (*current_time == rrd->live_head->last_up &&
847                      (long) *current_time_usec <=
848                      (long) rrd->live_head->last_up_usec)) {
849         rrd_set_error("illegal attempt to update using time %ld when "
850                           "last update time is %ld (minimum one second step)",
851                           *current_time, rrd->live_head->last_up);
852         return -1;
853     }
854     return 0;
855 }
856
857 /*
858  * Update pdp_new by interpreting the updvals according to the DS type
859  * (COUNTER, GAUGE, etc.).
860  *
861  * Returns 0 on success, -1 on error.
862  */
863 static int update_pdp_prep(
864     rrd_t *rrd, char **updvals, 
865     rrd_value_t *pdp_new, double interval) 
866 {
867     unsigned long ds_idx; 
868     int ii;
869     char     *endptr;   /* used in the conversion */
870     double rate;
871     char     *old_locale;
872     enum dst_en dst_idx;
873
874     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
875         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
876
877         /* make sure we do not build diffs with old last_ds values */
878         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
879             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
880             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
881         }
882
883         /* NOTE: DST_CDEF should never enter this if block, because
884          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
885          * accidently specified a value for the DST_CDEF. To handle this case,
886          * an extra check is required. */
887
888         if ((updvals[ds_idx+1][0] != 'U') &&
889                         (dst_idx != DST_CDEF) &&
890                         rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
891             rate = DNAN;
892
893             /* pdp_new contains rate * time ... eg the bytes transferred during
894              * the interval. Doing it this way saves a lot of math operations
895              */
896             switch (dst_idx) {
897                     case DST_COUNTER:
898                     case DST_DERIVE:
899                             for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
900                                 if ((updvals[ds_idx + 1][ii] < '0' || updvals[ds_idx + 1][ii] > '9') 
901                                                 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
902                                     rrd_set_error("not a simple integer: '%s'", updvals[ds_idx + 1]);
903                                     return -1;
904                                 }
905                             }
906                             if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
907                                 pdp_new[ds_idx] = rrd_diff(updvals[ds_idx+1], rrd->pdp_prep[ds_idx].last_ds);
908                                 if (dst_idx == DST_COUNTER) {
909                                     /* simple overflow catcher. This will fail
910                                      * terribly for non 32 or 64 bit counters
911                                      * ... are there any others in SNMP land?
912                                      */
913                                     if (pdp_new[ds_idx] < (double) 0.0)
914                                         pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
915                                     if (pdp_new[ds_idx] < (double) 0.0)
916                                         pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
917                                 }
918                                 rate = pdp_new[ds_idx] / interval;
919                             } else {
920                                 pdp_new[ds_idx] = DNAN;
921                             }
922                             break;
923                     case DST_ABSOLUTE:
924                             old_locale = setlocale(LC_NUMERIC, "C");
925                             errno = 0;
926                             pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
927                             setlocale(LC_NUMERIC, old_locale);
928                             if (errno > 0) {
929                                 rrd_set_error("converting '%s' to float: %s",
930                                                 updvals[ds_idx + 1], rrd_strerror(errno));
931                                 return -1;
932                             };
933                             if (endptr[0] != '\0') {
934                                 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",
935                                          updvals[ds_idx + 1], endptr);
936                                 return -1;
937                             }
938                             rate = pdp_new[ds_idx] / interval;
939                             break;
940                     case DST_GAUGE:
941                             errno = 0;
942                             old_locale = setlocale(LC_NUMERIC, "C");
943                             pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr) * interval;
944                             setlocale(LC_NUMERIC, old_locale);
945                             if (errno) {
946                                 rrd_set_error("converting '%s' to float: %s",
947                                                 updvals[ds_idx + 1], rrd_strerror(errno));
948                                 return -1;
949                             };
950                             if (endptr[0] != '\0') {
951                                 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",
952                                          updvals[ds_idx + 1], endptr);
953                                 return -1;
954                             }
955                             rate = pdp_new[ds_idx] / interval;
956                             break;
957                     default:
958                             rrd_set_error("rrd contains unknown DS type : '%s'",
959                                             rrd->ds_def[ds_idx].dst);
960                             return -1;
961             }
962             /* break out of this for loop if the error string is set */
963             if (rrd_test_error()) {
964                 return -1;
965             }
966             /* make sure pdp_temp is neither too large or too small
967              * if any of these occur it becomes unknown ...
968              * sorry folks ... */
969             if (!isnan(rate) &&
970                             ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
971                               rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
972                              (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
973                               rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
974                 pdp_new[ds_idx] = DNAN;
975             }
976         } else {
977             /* no news is news all the same */
978             pdp_new[ds_idx] = DNAN;
979         }
980
981
982         /* make a copy of the command line argument for the next run */
983 #ifdef DEBUG
984         fprintf(stderr, "prep ds[%lu]\t"
985                         "last_arg '%s'\t"
986                         "this_arg '%s'\t"
987                         "pdp_new %10.2f\n",
988                         ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx+1], pdp_new[ds_idx]);
989 #endif
990         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx+1], LAST_DS_LEN - 1);
991         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN-1] = '\0';
992     }
993     return 0;
994 }
995
996 /*
997  * How many PDP steps have elapsed since the last update? Returns the answer,
998  * and stores the time between the last update and the last PDP in pre_time,
999  * and the time between the last PDP and the current time in post_int.
1000  */
1001 static int calculate_elapsed_steps(
1002     rrd_t *rrd, 
1003     unsigned long current_time, 
1004     unsigned long current_time_usec,
1005     double interval,
1006     double *pre_int,
1007     double *post_int,
1008     unsigned long *proc_pdp_cnt) 
1009 {
1010     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1011     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1012                                  * time */
1013     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1014                                  * when it was last updated */
1015     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1016
1017     /* when was the current pdp started */
1018     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1019     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1020
1021     /* when did the last pdp_st occur */
1022     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1023     occu_pdp_st = current_time - occu_pdp_age;
1024
1025     if (occu_pdp_st > proc_pdp_st) {
1026         /* OK we passed the pdp_st moment */
1027         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;  /* how much of the input data
1028                                                                  * occurred before the latest
1029                                                                  * pdp_st moment*/
1030         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f;  /* adjust usecs */
1031         *post_int = occu_pdp_age;    /* how much after it */
1032         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1033     } else {
1034         *pre_int = interval;
1035         *post_int = 0;
1036     }
1037
1038     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1039
1040 #ifdef DEBUG
1041     printf("proc_pdp_age %lu\t"
1042                     "proc_pdp_st %lu\t"
1043                     "occu_pfp_age %lu\t"
1044                     "occu_pdp_st %lu\t"
1045                     "int %lf\t"
1046                     "pre_int %lf\t"
1047                     "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1048                     occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1049 #endif
1050
1051     /* compute the number of elapsed pdp_st moments */
1052     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1053 }
1054
1055 /*
1056  * Increment the PDP values by the values in pdp_new, or else initialize them.
1057  */
1058 static void simple_update(
1059     rrd_t *rrd, double interval, rrd_value_t *pdp_new) 
1060 {
1061     int i;
1062     for (i = 0; i < (signed)rrd->stat_head->ds_cnt; i++) {
1063         if (isnan(pdp_new[i])) {
1064             /* this is not really accurate if we use subsecond data arrival time
1065                should have thought of it when going subsecond resolution ...
1066                sorry next format change we will have it! */
1067             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
1068         } else {
1069             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1070                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1071             } else {
1072                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1073             }
1074         }
1075 #ifdef DEBUG
1076         fprintf(stderr,
1077                         "NO PDP  ds[%i]\t"
1078                         "value %10.2f\t"
1079                         "unkn_sec %5lu\n",
1080                         i,
1081                         rrd->pdp_prep[i].scratch[PDP_val].u_val,
1082                         rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1083 #endif
1084     }
1085 }
1086
1087 /*
1088  * Call process_pdp_st for each DS.
1089  *
1090  * Returns 0 on success, -1 on error.
1091  */
1092 static int process_all_pdp_st(
1093     rrd_t *rrd, double interval, double pre_int, double post_int, 
1094     unsigned long elapsed_pdp_st, rrd_value_t *pdp_new, rrd_value_t *pdp_temp) 
1095 {
1096     unsigned long ds_idx;
1097     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1098        rate*seconds which occurred up to the last run.
1099        pdp_new[] contains rate*seconds from the latest run.
1100        pdp_temp[] will contain the rate for cdp */
1101
1102     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1103         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1104                                 elapsed_pdp_st * rrd->stat_head->pdp_step, 
1105                                 pdp_new, pdp_temp) == -1) {
1106             return -1;
1107         }
1108 #ifdef DEBUG
1109         fprintf(stderr, "PDP UPD ds[%lu]\t"
1110                         "pdp_temp %10.2f\t"
1111                         "new_prep %10.2f\t"
1112                         "new_unkn_sec %5lu\n",
1113                         ds_idx, pdp_temp[ds_idx],
1114                         rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1115                         rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1116 #endif
1117     }
1118     return 0;
1119 }
1120
1121 /*
1122  * Process an update that occurs after one of the PDP moments.
1123  * Increments the PDP value, sets NAN if time greater than the
1124  * heartbeats have elapsed, processes CDEFs.
1125  *
1126  * Returns 0 on success, -1 on error.
1127  */
1128 static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval, 
1129     double pre_int, double post_int, long diff_pdp_st, 
1130     rrd_value_t *pdp_new, rrd_value_t *pdp_temp) 
1131 {
1132     int i;
1133     /* update pdp_prep to the current pdp_st. */
1134     double    pre_unknown = 0.0;
1135     unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1136     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1137
1138     rpnstack_t     rpnstack;    /* used for COMPUTE DS */
1139     rpnstack_init(&rpnstack);
1140
1141
1142     if (isnan(pdp_new[ds_idx])) {
1143         /* a final bit of unknown to be added bevore calculation
1144            we use a temporary variable for this so that we
1145            don't have to turn integer lines before using the value */
1146         pre_unknown = pre_int;
1147     } else {
1148         if (isnan(scratch[PDP_val].u_val)) {
1149             scratch[PDP_val].u_val = 0;
1150         } 
1151         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1152     }
1153
1154     /* if too much of the pdp_prep is unknown we dump it */
1155     /* if the interval is larger thatn mrhb we get NAN */
1156     if ((interval > mrhb) ||
1157                     (diff_pdp_st <= (signed)scratch[PDP_unkn_sec_cnt].u_cnt)) {
1158         pdp_temp[ds_idx] = DNAN;
1159     } else {
1160         pdp_temp[ds_idx] = scratch[PDP_val].u_val / 
1161                 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) - pre_unknown);
1162     }
1163
1164     /* process CDEF data sources; remember each CDEF DS can
1165      * only reference other DS with a lower index number */
1166     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1167         rpnp_t   *rpnp;
1168
1169         rpnp = rpn_expand((rpn_cdefds_t *)&(rrd->ds_def[ds_idx].par[DS_cdef]));
1170         /* substitute data values for OP_VARIABLE nodes */
1171         for (i = 0; rpnp[i].op != OP_END; i++) {
1172             if (rpnp[i].op == OP_VARIABLE) {
1173                 rpnp[i].op = OP_NUMBER;
1174                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1175             }
1176         }
1177         /* run the rpn calculator */
1178         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1179             free(rpnp);
1180             rpnstack_free(&rpnstack);
1181             return -1;
1182         }
1183     }
1184
1185     /* make pdp_prep ready for the next run */
1186     if (isnan(pdp_new[ds_idx])) {
1187         /* this is not realy accurate if we use subsecond data arival time
1188            should have thought of it when going subsecond resolution ...
1189            sorry next format change we will have it! */
1190         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1191         scratch[PDP_val].u_val = DNAN;
1192     } else {
1193         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1194         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1195     }
1196     rpnstack_free(&rpnstack);
1197     return 0;
1198 }
1199
1200 /*
1201  * Iterate over all the RRAs for a given DS and:
1202  * 1. Decide whether to schedule a smooth later
1203  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1204  * 3. Update the CDP
1205  *
1206  * Returns 0 on success, -1 on error
1207  */
1208 static int update_all_cdp_prep(
1209     rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin, 
1210     rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
1211     rrd_value_t  **last_seasonal_coef, rrd_value_t  **seasonal_coef,
1212     rrd_value_t *pdp_temp, unsigned long *rra_current, 
1213     unsigned long *skip_update, int *schedule_smooth)
1214 {
1215     unsigned long rra_idx;
1216     /* index into the CDP scratch array */
1217     enum cf_en current_cf;
1218     unsigned long rra_start;
1219     /* number of rows to be updated in an RRA for a data value. */
1220     unsigned long start_pdp_offset;
1221
1222     rra_start = rra_begin;
1223     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1224         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1225         start_pdp_offset = rrd->rra_def[rra_idx].pdp_cnt - proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1226         skip_update[rra_idx] = 0;
1227         if (start_pdp_offset <= elapsed_pdp_st) {
1228             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1229                     rrd->rra_def[rra_idx].pdp_cnt + 1;
1230         } else {
1231             rra_step_cnt[rra_idx] = 0;
1232         }
1233
1234         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1235             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1236              * so that they will be correct for the next observed value; note that for
1237              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1238              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1239             if (rra_step_cnt[rra_idx] > 1) {
1240                 skip_update[rra_idx] = 1;
1241                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1242                                 elapsed_pdp_st, last_seasonal_coef);
1243                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1244                                 elapsed_pdp_st + 1, seasonal_coef);
1245             }
1246             /* periodically run a smoother for seasonal effects */
1247             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1248 #ifdef DEBUG
1249                 fprintf(stderr, "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1250                                 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st, 
1251                                 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt);
1252 #endif
1253                 *schedule_smooth = 1;
1254             }
1255             *rra_current = rrd_tell(rrd_file);
1256         }
1257         if (rrd_test_error())
1258             return -1;
1259
1260         if (update_cdp_prep(rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, 
1261                                 rra_idx, pdp_temp, *last_seasonal_coef, *seasonal_coef, 
1262                                 current_cf) == -1) {
1263             return -1;
1264         }
1265         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1266     }
1267     return 0;
1268 }
1269
1270 /* 
1271  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1272  */
1273 static int do_schedule_smooth(
1274     rrd_t *rrd, unsigned long rra_idx,
1275     unsigned long elapsed_pdp_st)
1276 {
1277     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1278     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1279     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1280     unsigned long seasonal_smooth_idx = rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1281     unsigned long *init_seasonal = &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1282
1283     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1284      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1285      * really an RRA level, not a data source within RRA level parameter, but
1286      * the rra_def is read only for rrd_update (not flushed to disk). */
1287     if (*init_seasonal > BURNIN_CYCLES) {
1288         /* someone has no doubt invented a trick to deal with this wrap around,
1289          * but at least this code is clear. */
1290         if (seasonal_smooth_idx > cur_row) {
1291             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1292              * between PDP and CDP */
1293             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1294         } 
1295         /* can't rely on negative numbers because we are working with
1296          * unsigned values */
1297         return (cur_row + elapsed_pdp_st >= row_cnt
1298                         && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1299     } 
1300     /* mark off one of the burn-in cycles */
1301     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1302 }
1303
1304 /*
1305  * For a given RRA, iterate over the data sources and call the appropriate
1306  * consolidation function.
1307  *
1308  * Returns 0 on success, -1 on error.
1309  */
1310 static int update_cdp_prep(
1311     rrd_t *rrd, 
1312     unsigned long elapsed_pdp_st, 
1313     unsigned long start_pdp_offset, 
1314     unsigned long *rra_step_cnt, 
1315     int rra_idx, 
1316     rrd_value_t *pdp_temp, 
1317     rrd_value_t *last_seasonal_coef, 
1318     rrd_value_t *seasonal_coef, 
1319     int current_cf) 
1320 {
1321     unsigned long ds_idx, cdp_idx;
1322     /* update CDP_PREP areas */
1323     /* loop over data soures within each RRA */
1324     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1325
1326         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1327
1328         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1329             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf, 
1330                             pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1331                             elapsed_pdp_st, start_pdp_offset,
1332                             rrd->rra_def[rra_idx].pdp_cnt,
1333                             rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val, rra_idx, ds_idx);
1334         } else { 
1335             /* Nothing to consolidate if there's one PDP per CDP. However, if
1336              * we've missed some PDPs, let's update null counters etc. */
1337             if (elapsed_pdp_st > 2) {
1338                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef, seasonal_coef, 
1339                                 rra_idx, ds_idx, cdp_idx, current_cf);
1340             }
1341         }
1342
1343         if (rrd_test_error())
1344             return -1;
1345     }       /* endif data sources loop */
1346     return 0;
1347 }
1348
1349 /*
1350  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1351  * primary value, secondary value, and # of unknowns.
1352  */
1353 static void update_cdp(
1354     unival *scratch,
1355     int current_cf,
1356     rrd_value_t pdp_temp_val,
1357     unsigned long rra_step_cnt,
1358     unsigned long elapsed_pdp_st,
1359     unsigned long start_pdp_offset,
1360     unsigned long pdp_cnt,
1361     rrd_value_t xff,
1362     int i, int ii)
1363 {
1364     /* shorthand variables */
1365     rrd_value_t            *cdp_val = &scratch[CDP_val].u_val;
1366     rrd_value_t    *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1367     rrd_value_t  *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1368     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1369
1370     if (rra_step_cnt) {
1371         /* If we are in this block, as least 1 CDP value will be written to
1372          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1373          * to be written, then the "fill in" value is the CDP_secondary_val
1374          * entry. */
1375         if (isnan(pdp_temp_val)) {
1376             *cdp_unkn_pdp_cnt += start_pdp_offset;
1377             *cdp_secondary_val = DNAN;
1378         } else {
1379             /* CDP_secondary value is the RRA "fill in" value for intermediary
1380              * CDP data entries. No matter the CF, the value is the same because
1381              * the average, max, min, and last of a list of identical values is
1382              * the same, namely, the value itself. */
1383             *cdp_secondary_val = pdp_temp_val;
1384         }
1385
1386         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1387             *cdp_primary_val = DNAN;
1388             if (current_cf == CF_AVERAGE) {
1389                 *cdp_val = initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1390                                 start_pdp_offset, pdp_cnt);
1391             } else {
1392                 *cdp_val = pdp_temp_val;
1393             }
1394         } else {
1395             initialize_cdp_val(scratch, current_cf, pdp_temp_val, 
1396                             elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1397         }   /* endif meets xff value requirement for a valid value */
1398         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1399          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1400         if (isnan(pdp_temp_val))
1401             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1402         else
1403             *cdp_unkn_pdp_cnt = 0;
1404     } else {    /* rra_step_cnt[i]  == 0 */
1405
1406 #ifdef DEBUG
1407         if (isnan(*cdp_val)) {
1408             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1409                             i, ii);
1410         } else {
1411             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1412                             i, ii, *cdp_val);
1413         }
1414 #endif
1415         if (isnan(pdp_temp_val)) {
1416             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1417         } else {
1418             *cdp_val = calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st, current_cf, i, ii);
1419         }
1420     }
1421 }
1422
1423 /*
1424  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1425  * on the type of consolidation function.
1426  */
1427 static void initialize_cdp_val(
1428     unival *scratch, 
1429     int current_cf,
1430     rrd_value_t pdp_temp_val,
1431     unsigned long elapsed_pdp_st, 
1432     unsigned long start_pdp_offset,
1433     unsigned long pdp_cnt) 
1434 {
1435     rrd_value_t cum_val, cur_val;
1436
1437     switch (current_cf) {
1438             case CF_AVERAGE:
1439                     cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1440                     cur_val = IFDNAN(pdp_temp_val, 0.0);
1441                     scratch[CDP_primary_val].u_val =
1442                             (cum_val + cur_val * start_pdp_offset) /
1443                             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1444                     scratch[CDP_val].u_val = initialize_average_carry_over(
1445                                     pdp_temp_val, elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1446                     break;
1447             case CF_MAXIMUM:
1448                     cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1449                     cur_val = IFDNAN(pdp_temp_val, -DINF);
1450 #if 0
1451 #ifdef DEBUG
1452                     if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1453                         fprintf(stderr,
1454                                         "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1455                                         i, ii);
1456                         exit(-1);
1457                     }
1458 #endif
1459 #endif
1460                     if (cur_val > cum_val)
1461                         scratch[CDP_primary_val].u_val = cur_val;
1462                     else
1463                         scratch[CDP_primary_val].u_val = cum_val;
1464                     /* initialize carry over value */
1465                     scratch[CDP_val].u_val = pdp_temp_val;
1466                     break;
1467             case CF_MINIMUM:
1468                     cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1469                     cur_val = IFDNAN(pdp_temp_val, DINF);
1470 #if 0
1471 #ifdef DEBUG
1472                     if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1473                         fprintf(stderr, "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1474                                         i, ii);
1475                         exit(-1);
1476                     }
1477 #endif
1478 #endif
1479                     if (cur_val < cum_val)
1480                         scratch[CDP_primary_val].u_val = cur_val;
1481                     else
1482                         scratch[CDP_primary_val].u_val = cum_val;
1483                     /* initialize carry over value */
1484                     scratch[CDP_val].u_val = pdp_temp_val;
1485                     break;
1486             case CF_LAST:
1487             default:
1488                     scratch[CDP_primary_val].u_val = pdp_temp_val;
1489                     /* initialize carry over value */
1490                     scratch[CDP_val].u_val = pdp_temp_val;
1491                     break;
1492     }
1493 }
1494
1495 /*
1496  * Update the consolidation function for Holt-Winters functions as
1497  * well as other functions that don't actually consolidate multiple
1498  * PDPs.
1499  */
1500 static void reset_cdp(
1501     rrd_t *rrd,
1502     unsigned long elapsed_pdp_st, 
1503     rrd_value_t *pdp_temp,
1504     rrd_value_t *last_seasonal_coef,
1505     rrd_value_t *seasonal_coef,
1506     int rra_idx, int ds_idx, int cdp_idx, 
1507     enum cf_en current_cf)
1508 {
1509     unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1510
1511     switch (current_cf) {
1512             case CF_AVERAGE:
1513             default:
1514                     scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1515                     scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1516                     break;
1517             case CF_SEASONAL:
1518             case CF_DEVSEASONAL:
1519                     /* need to update cached seasonal values, so they are consistent
1520                      * with the bulk update */
1521                     /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1522                      * CDP_last_deviation are the same. */
1523                     scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1524                     scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1525                     break;
1526             case CF_HWPREDICT:
1527             case CF_MHWPREDICT:
1528                     /* need to update the null_count and last_null_count.
1529                      * even do this for non-DNAN pdp_temp because the
1530                      * algorithm is not learning from batch updates. */
1531                     scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1532                     scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1533                     /* fall through */
1534             case CF_DEVPREDICT:
1535                     scratch[CDP_primary_val].u_val = DNAN;
1536                     scratch[CDP_secondary_val].u_val = DNAN;
1537                     break;
1538             case CF_FAILURES:
1539                     /* do not count missed bulk values as failures */
1540                     scratch[CDP_primary_val].u_val = 0;
1541                     scratch[CDP_secondary_val].u_val = 0;
1542                     /* need to reset violations buffer.
1543                      * could do this more carefully, but for now, just
1544                      * assume a bulk update wipes away all violations. */
1545                     erase_violations(rrd, cdp_idx, rra_idx);
1546                     break;
1547     }
1548 }
1549
1550 static rrd_value_t initialize_average_carry_over(
1551     rrd_value_t pdp_temp_val, 
1552     unsigned long elapsed_pdp_st,
1553     unsigned long start_pdp_offset,
1554     unsigned long pdp_cnt)
1555 {
1556     /* initialize carry over value */
1557     if (isnan(pdp_temp_val)) {
1558         return DNAN;
1559     } 
1560     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1561 }
1562
1563 /*
1564  * Update or initialize a CDP value based on the consolidation
1565  * function.
1566  *
1567  * Returns the new value.
1568  */
1569 static rrd_value_t calculate_cdp_val(
1570     rrd_value_t cdp_val,
1571     rrd_value_t pdp_temp_val,
1572     unsigned long elapsed_pdp_st,
1573     int current_cf, int i, int ii)
1574 {
1575     if (isnan(cdp_val)) {
1576         if (current_cf == CF_AVERAGE) {
1577             pdp_temp_val *= elapsed_pdp_st;
1578         }
1579 #ifdef DEBUG
1580         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1581                         i, ii, pdp_temp_val);
1582 #endif
1583         return pdp_temp_val;
1584     } 
1585     if (current_cf == CF_AVERAGE) 
1586         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1587     if (current_cf == CF_MINIMUM)
1588         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1589     if (current_cf == CF_MAXIMUM)
1590         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1591
1592     return pdp_temp_val;
1593 }
1594
1595 /*
1596  * For each RRA, update the seasonal values and then call update_aberrant_CF
1597  * for each data source.
1598  *
1599  * Return 0 on success, -1 on error.
1600  */
1601 static int update_aberrant_cdps(
1602     rrd_t *rrd, rrd_file_t *rrd_file, unsigned long rra_begin,
1603     unsigned long *rra_current, unsigned long elapsed_pdp_st, 
1604     rrd_value_t *pdp_temp, rrd_value_t  **seasonal_coef) 
1605 {
1606     unsigned long rra_idx, ds_idx, j;
1607
1608     /* number of PDP steps since the last update that
1609      * are assigned to the first CDP to be generated
1610      * since the last update. */
1611     unsigned short scratch_idx;
1612     unsigned long  rra_start;
1613     enum cf_en     current_cf;
1614
1615     /* this loop is only entered if elapsed_pdp_st < 3 */
1616     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1617                     j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1618         rra_start = rra_begin;
1619         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1620             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1621                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1622                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1623                     if (scratch_idx == CDP_primary_val) {
1624                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1625                                         elapsed_pdp_st + 1, seasonal_coef);
1626                     } else {
1627                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1628                                         elapsed_pdp_st + 2, seasonal_coef);
1629                     }
1630                     *rra_current = rrd_tell(rrd_file);
1631                 }
1632                 if (rrd_test_error())
1633                     return -1;
1634                 /* loop over data soures within each RRA */
1635                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1636                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1637                                     rra_idx * (rrd->stat_head->ds_cnt) + ds_idx,
1638                                     rra_idx, ds_idx, scratch_idx, *seasonal_coef);
1639                 }
1640             }
1641             rra_start += rrd->rra_def[rra_idx].row_cnt 
1642                     * rrd->stat_head->ds_cnt 
1643                     * sizeof(rrd_value_t);
1644         }
1645     }
1646     return 0;
1647 }
1648
1649 /* 
1650  * Move sequentially through the file, writing one RRA at a time.  Note this
1651  * architecture divorces the computation of CDP with flushing updated RRA
1652  * entries to disk.
1653  *
1654  * Return 0 on success, -1 on error.
1655  */
1656 static int write_to_rras(
1657     rrd_t *rrd, 
1658     rrd_file_t *rrd_file, 
1659     unsigned long *rra_step_cnt,
1660     unsigned long rra_begin, 
1661     unsigned long *rra_current,
1662     time_t current_time,
1663     unsigned long *skip_update,
1664     info_t **pcdp_summary)
1665 {
1666     unsigned long rra_idx;
1667     unsigned long rra_start;    
1668     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1669     time_t    rra_time = 0; /* time of update for a RRA */
1670
1671     /* Ready to write to disk */
1672     rra_start = rra_begin;
1673     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1674         /* skip unless there's something to write */
1675         if (rra_step_cnt[rra_idx]) {
1676             /* write the first row */
1677 #ifdef DEBUG
1678             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1679 #endif
1680             rrd->rra_ptr[rra_idx].cur_row++;
1681             if (rrd->rra_ptr[rra_idx].cur_row >= rrd->rra_def[rra_idx].row_cnt)
1682                 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1683             /* position on the first row */
1684             rra_pos_tmp = rra_start +
1685                     (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1686                     sizeof(rrd_value_t);
1687             if (rra_pos_tmp != *rra_current) {
1688                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1689                     rrd_set_error("seek error in rrd");
1690                     return -1;
1691                 }
1692                 *rra_current = rra_pos_tmp;
1693             }
1694 #ifdef DEBUG
1695             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1696 #endif
1697             if (!skip_update[rra_idx]) {
1698                 if (*pcdp_summary != NULL) {
1699                     rra_time = (current_time - current_time
1700                                     % (rrd->rra_def[rra_idx].pdp_cnt *
1701                                             rrd->stat_head->pdp_step))
1702                             - ((rra_step_cnt[rra_idx] - 1) * rrd->rra_def[rra_idx].pdp_cnt *
1703                                             rrd->stat_head->pdp_step);
1704                 }
1705                 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1706                                         pcdp_summary, rra_time) == -1)
1707                     return -1;
1708             }
1709
1710             /* write other rows of the bulk update, if any */
1711             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1712                 if (++rrd->rra_ptr[rra_idx].cur_row == rrd->rra_def[rra_idx].row_cnt) {
1713 #ifdef DEBUG
1714                     fprintf(stderr,
1715                                     "Wraparound for RRA %s, %lu updates left\n",
1716                                     rrd->rra_def[rra_idx].cf_nam, rra_step_cnt[rra_idx] - 1);
1717 #endif
1718                     /* wrap */
1719                     rrd->rra_ptr[rra_idx].cur_row = 0;
1720                     /* seek back to beginning of current rra */
1721                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1722                         rrd_set_error("seek error in rrd");
1723                         return -1;
1724                     }
1725 #ifdef DEBUG
1726                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1727                                     rrd_file->pos);
1728 #endif
1729                     *rra_current = rra_start;
1730                 }
1731                 if (!skip_update[rra_idx]) {
1732                     if (*pcdp_summary != NULL) {
1733                         rra_time = (current_time - current_time
1734                                         % (rrd->rra_def[rra_idx].pdp_cnt *
1735                                                 rrd->stat_head->pdp_step))
1736                                 -
1737                                 ((rra_step_cnt[rra_idx] - 2) * rrd->rra_def[rra_idx].pdp_cnt *
1738                                  rrd->stat_head->pdp_step);
1739                     }
1740                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1741                                             CDP_secondary_val, pcdp_summary, rra_time) == -1)
1742                         return -1;
1743                 }
1744             }
1745         }
1746         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1747                 sizeof(rrd_value_t);
1748     }           /* RRA LOOP */
1749
1750     return 0;
1751 }
1752
1753 /*
1754  * Write out one row of values (one value per DS) to the archive.
1755  *
1756  * Returns 0 on success, -1 on error.
1757  */
1758 static int write_RRA_row(
1759     rrd_file_t *rrd_file,
1760     rrd_t *rrd,
1761     unsigned long rra_idx,
1762     unsigned long *rra_current,
1763     unsigned short CDP_scratch_idx,
1764     info_t **pcdp_summary,
1765     time_t rra_time)
1766 {
1767     unsigned long ds_idx, cdp_idx;
1768     infoval   iv;
1769
1770     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1771         /* compute the cdp index */
1772         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1773 #ifdef DEBUG
1774         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1775                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1776                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1777 #endif
1778         if (pcdp_summary != NULL) {
1779             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1780             /* append info to the return hash */
1781             *pcdp_summary = info_push(*pcdp_summary,
1782                      sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1783                            rrd->rra_def[rra_idx].cf_nam,
1784                            rrd->rra_def[rra_idx].pdp_cnt,
1785                            rrd->ds_def[ds_idx].ds_nam), RD_I_VAL, iv);
1786         }
1787         if (rrd_write(rrd_file,
1788              &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1789              sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1790             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1791             return -1;
1792         }
1793         *rra_current += sizeof(rrd_value_t);
1794     }
1795     return 0;
1796 }
1797
1798 /*
1799  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1800  *
1801  * Returns 0 on success, -1 otherwise
1802  */
1803 static int smooth_all_rras(
1804     rrd_t *rrd, 
1805     rrd_file_t *rrd_file, 
1806     unsigned long rra_begin) 
1807 {
1808     unsigned long rra_start = rra_begin;
1809     unsigned long rra_idx;
1810     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
1811         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
1812                         cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
1813 #ifdef DEBUG
1814             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
1815 #endif
1816             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
1817             if (rrd_test_error())
1818                 return -1;
1819         }
1820         rra_start += rrd->rra_def[rra_idx].row_cnt
1821                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1822     }
1823     return 0;
1824 }
1825
1826 #ifndef HAVE_MMAP
1827 /*
1828  * Flush changes to disk (unless we're using mmap)
1829  *
1830  * Returns 0 on success, -1 otherwise
1831  */
1832 static int write_changes_to_disk(
1833     rrd_t *rrd, rrd_file_t *rrd_file, int version) 
1834 {
1835     /* we just need to write back the live header portion now*/
1836     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1837                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
1838                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
1839                  SEEK_SET) != 0) {
1840         rrd_set_error("seek rrd for live header writeback");
1841         return -1;
1842     }
1843     if (version >= 3) {
1844         if (rrd_write(rrd_file, rrd->live_head,
1845                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1846             rrd_set_error("rrd_write live_head to rrd");
1847             return -1;
1848         }
1849     } else {
1850         if (rrd_write(rrd_file, &rrd->live_head->last_up,
1851                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1852             rrd_set_error("rrd_write live_head to rrd");
1853             return -1;
1854         }
1855     }
1856
1857
1858     if (rrd_write(rrd_file, rrd->pdp_prep,
1859                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
1860         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
1861         rrd_set_error("rrd_write pdp_prep to rrd");
1862         return -1;
1863     }
1864
1865     if (rrd_write(rrd_file, rrd->cdp_prep,
1866                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
1867                   rrd->stat_head->ds_cnt)
1868         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
1869                       rrd->stat_head->ds_cnt)) {
1870
1871         rrd_set_error("rrd_write cdp_prep to rrd");
1872         return -1;
1873     }
1874
1875     if (rrd_write(rrd_file, rrd->rra_ptr,
1876                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
1877         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
1878         rrd_set_error("rrd_write rra_ptr to rrd");
1879         return -1;
1880     }
1881     return 0;
1882 }
1883 #endif