prepare for the release of rrdtool-1.3.2
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
9
10 #include "rrd_tool.h"
11
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
17
18 #include <locale.h>
19
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
22
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
25
26 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 /*
28  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
29  * replacement.
30  */
31 #include <sys/timeb.h>
32
33 #ifndef __MINGW32__
34 struct timeval {
35     time_t    tv_sec;   /* seconds */
36     long      tv_usec;  /* microseconds */
37 };
38 #endif
39
40 struct __timezone {
41     int       tz_minuteswest;   /* minutes W of Greenwich */
42     int       tz_dsttime;   /* type of dst correction */
43 };
44
45 static int gettimeofday(
46     struct timeval *t,
47     struct __timezone *tz)
48 {
49
50     struct _timeb current_time;
51
52     _ftime(&current_time);
53
54     t->tv_sec = current_time.time;
55     t->tv_usec = current_time.millitm * 1000;
56
57     return 0;
58 }
59
60 #endif
61
62 /* FUNCTION PROTOTYPES */
63
64 int       rrd_update_r(
65     const char *filename,
66     const char *tmplt,
67     int argc,
68     const char **argv);
69 int       _rrd_update(
70     const char *filename,
71     const char *tmplt,
72     int argc,
73     const char **argv,
74     rrd_info_t *);
75
76 static int allocate_data_structures(
77     rrd_t *rrd,
78     char ***updvals,
79     rrd_value_t **pdp_temp,
80     const char *tmplt,
81     long **tmpl_idx,
82     unsigned long *tmpl_cnt,
83     unsigned long **rra_step_cnt,
84     unsigned long **skip_update,
85     rrd_value_t **pdp_new);
86
87 static int parse_template(
88     rrd_t *rrd,
89     const char *tmplt,
90     unsigned long *tmpl_cnt,
91     long *tmpl_idx);
92
93 static int process_arg(
94     char *step_start,
95     rrd_t *rrd,
96     rrd_file_t *rrd_file,
97     unsigned long rra_begin,
98     unsigned long *rra_current,
99     time_t *current_time,
100     unsigned long *current_time_usec,
101     rrd_value_t *pdp_temp,
102     rrd_value_t *pdp_new,
103     unsigned long *rra_step_cnt,
104     char **updvals,
105     long *tmpl_idx,
106     unsigned long tmpl_cnt,
107     rrd_info_t ** pcdp_summary,
108     int version,
109     unsigned long *skip_update,
110     int *schedule_smooth);
111
112 static int parse_ds(
113     rrd_t *rrd,
114     char **updvals,
115     long *tmpl_idx,
116     char *input,
117     unsigned long tmpl_cnt,
118     time_t *current_time,
119     unsigned long *current_time_usec,
120     int version);
121
122 static int get_time_from_reading(
123     rrd_t *rrd,
124     char timesyntax,
125     char **updvals,
126     time_t *current_time,
127     unsigned long *current_time_usec,
128     int version);
129
130 static int update_pdp_prep(
131     rrd_t *rrd,
132     char **updvals,
133     rrd_value_t *pdp_new,
134     double interval);
135
136 static int calculate_elapsed_steps(
137     rrd_t *rrd,
138     unsigned long current_time,
139     unsigned long current_time_usec,
140     double interval,
141     double *pre_int,
142     double *post_int,
143     unsigned long *proc_pdp_cnt);
144
145 static void simple_update(
146     rrd_t *rrd,
147     double interval,
148     rrd_value_t *pdp_new);
149
150 static int process_all_pdp_st(
151     rrd_t *rrd,
152     double interval,
153     double pre_int,
154     double post_int,
155     unsigned long elapsed_pdp_st,
156     rrd_value_t *pdp_new,
157     rrd_value_t *pdp_temp);
158
159 static int process_pdp_st(
160     rrd_t *rrd,
161     unsigned long ds_idx,
162     double interval,
163     double pre_int,
164     double post_int,
165     long diff_pdp_st,
166     rrd_value_t *pdp_new,
167     rrd_value_t *pdp_temp);
168
169 static int update_all_cdp_prep(
170     rrd_t *rrd,
171     unsigned long *rra_step_cnt,
172     unsigned long rra_begin,
173     rrd_file_t *rrd_file,
174     unsigned long elapsed_pdp_st,
175     unsigned long proc_pdp_cnt,
176     rrd_value_t **last_seasonal_coef,
177     rrd_value_t **seasonal_coef,
178     rrd_value_t *pdp_temp,
179     unsigned long *rra_current,
180     unsigned long *skip_update,
181     int *schedule_smooth);
182
183 static int do_schedule_smooth(
184     rrd_t *rrd,
185     unsigned long rra_idx,
186     unsigned long elapsed_pdp_st);
187
188 static int update_cdp_prep(
189     rrd_t *rrd,
190     unsigned long elapsed_pdp_st,
191     unsigned long start_pdp_offset,
192     unsigned long *rra_step_cnt,
193     int rra_idx,
194     rrd_value_t *pdp_temp,
195     rrd_value_t *last_seasonal_coef,
196     rrd_value_t *seasonal_coef,
197     int current_cf);
198
199 static void update_cdp(
200     unival *scratch,
201     int current_cf,
202     rrd_value_t pdp_temp_val,
203     unsigned long rra_step_cnt,
204     unsigned long elapsed_pdp_st,
205     unsigned long start_pdp_offset,
206     unsigned long pdp_cnt,
207     rrd_value_t xff,
208     int i,
209     int ii);
210
211 static void initialize_cdp_val(
212     unival *scratch,
213     int current_cf,
214     rrd_value_t pdp_temp_val,
215     unsigned long elapsed_pdp_st,
216     unsigned long start_pdp_offset,
217     unsigned long pdp_cnt);
218
219 static void reset_cdp(
220     rrd_t *rrd,
221     unsigned long elapsed_pdp_st,
222     rrd_value_t *pdp_temp,
223     rrd_value_t *last_seasonal_coef,
224     rrd_value_t *seasonal_coef,
225     int rra_idx,
226     int ds_idx,
227     int cdp_idx,
228     enum cf_en current_cf);
229
230 static rrd_value_t initialize_average_carry_over(
231     rrd_value_t pdp_temp_val,
232     unsigned long elapsed_pdp_st,
233     unsigned long start_pdp_offset,
234     unsigned long pdp_cnt);
235
236 static rrd_value_t calculate_cdp_val(
237     rrd_value_t cdp_val,
238     rrd_value_t pdp_temp_val,
239     unsigned long elapsed_pdp_st,
240     int current_cf,
241     int i,
242     int ii);
243
244 static int update_aberrant_cdps(
245     rrd_t *rrd,
246     rrd_file_t *rrd_file,
247     unsigned long rra_begin,
248     unsigned long *rra_current,
249     unsigned long elapsed_pdp_st,
250     rrd_value_t *pdp_temp,
251     rrd_value_t **seasonal_coef);
252
253 static int write_to_rras(
254     rrd_t *rrd,
255     rrd_file_t *rrd_file,
256     unsigned long *rra_step_cnt,
257     unsigned long rra_begin,
258     unsigned long *rra_current,
259     time_t current_time,
260     unsigned long *skip_update,
261     rrd_info_t ** pcdp_summary);
262
263 static int write_RRA_row(
264     rrd_file_t *rrd_file,
265     rrd_t *rrd,
266     unsigned long rra_idx,
267     unsigned long *rra_current,
268     unsigned short CDP_scratch_idx,
269     rrd_info_t ** pcdp_summary,
270     time_t rra_time);
271
272 static int smooth_all_rras(
273     rrd_t *rrd,
274     rrd_file_t *rrd_file,
275     unsigned long rra_begin);
276
277 #ifndef HAVE_MMAP
278 static int write_changes_to_disk(
279     rrd_t *rrd,
280     rrd_file_t *rrd_file,
281     int version);
282 #endif
283
284 /*
285  * normalize time as returned by gettimeofday. usec part must
286  * be always >= 0
287  */
288 static inline void normalize_time(
289     struct timeval *t)
290 {
291     if (t->tv_usec < 0) {
292         t->tv_sec--;
293         t->tv_usec += 1e6L;
294     }
295 }
296
297 /*
298  * Sets current_time and current_time_usec based on the current time.
299  * current_time_usec is set to 0 if the version number is 1 or 2.
300  */
301 static inline void initialize_time(
302     time_t *current_time,
303     unsigned long *current_time_usec,
304     int version)
305 {
306     struct timeval tmp_time;    /* used for time conversion */
307
308     gettimeofday(&tmp_time, 0);
309     normalize_time(&tmp_time);
310     *current_time = tmp_time.tv_sec;
311     if (version >= 3) {
312         *current_time_usec = tmp_time.tv_usec;
313     } else {
314         *current_time_usec = 0;
315     }
316 }
317
318 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
319
320 rrd_info_t *rrd_update_v(
321     int argc,
322     char **argv)
323 {
324     char     *tmplt = NULL;
325     rrd_info_t *result = NULL;
326     rrd_infoval_t rc;
327     struct option long_options[] = {
328         {"template", required_argument, 0, 't'},
329         {0, 0, 0, 0}
330     };
331
332     rc.u_int = -1;
333     optind = 0;
334     opterr = 0;         /* initialize getopt */
335
336     while (1) {
337         int       option_index = 0;
338         int       opt;
339
340         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
341
342         if (opt == EOF)
343             break;
344
345         switch (opt) {
346         case 't':
347             tmplt = optarg;
348             break;
349
350         case '?':
351             rrd_set_error("unknown option '%s'", argv[optind - 1]);
352             goto end_tag;
353         }
354     }
355
356     /* need at least 2 arguments: filename, data. */
357     if (argc - optind < 2) {
358         rrd_set_error("Not enough arguments");
359         goto end_tag;
360     }
361     rc.u_int = 0;
362     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
363     rc.u_int = _rrd_update(argv[optind], tmplt,
364                            argc - optind - 1,
365                            (const char **) (argv + optind + 1), result);
366     result->value.u_int = rc.u_int;
367   end_tag:
368     return result;
369 }
370
371 int rrd_update(
372     int argc,
373     char **argv)
374 {
375     struct option long_options[] = {
376         {"template", required_argument, 0, 't'},
377         {0, 0, 0, 0}
378     };
379     int       option_index = 0;
380     int       opt;
381     char     *tmplt = NULL;
382     int       rc = -1;
383
384     optind = 0;
385     opterr = 0;         /* initialize getopt */
386
387     while (1) {
388         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
389
390         if (opt == EOF)
391             break;
392
393         switch (opt) {
394         case 't':
395             tmplt = strdup(optarg);
396             break;
397
398         case '?':
399             rrd_set_error("unknown option '%s'", argv[optind - 1]);
400             goto out;
401         }
402     }
403
404     /* need at least 2 arguments: filename, data. */
405     if (argc - optind < 2) {
406         rrd_set_error("Not enough arguments");
407         goto out;
408     }
409
410     rc = rrd_update_r(argv[optind], tmplt,
411                       argc - optind - 1, (const char **) (argv + optind + 1));
412   out:
413     free(tmplt);
414     return rc;
415 }
416
417 int rrd_update_r(
418     const char *filename,
419     const char *tmplt,
420     int argc,
421     const char **argv)
422 {
423     return _rrd_update(filename, tmplt, argc, argv, NULL);
424 }
425
426 int _rrd_update(
427     const char *filename,
428     const char *tmplt,
429     int argc,
430     const char **argv,
431     rrd_info_t * pcdp_summary)
432 {
433
434     int       arg_i = 2;
435
436     unsigned long rra_begin;    /* byte pointer to the rra
437                                  * area in the rrd file.  this
438                                  * pointer never changes value */
439     unsigned long rra_current;  /* byte pointer to the current write
440                                  * spot in the rrd file. */
441     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
442                              * to the existing entry */
443     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
444                              * to the cdp values */
445
446     long     *tmpl_idx; /* index representing the settings
447                          * transported by the tmplt index */
448     unsigned long tmpl_cnt = 2; /* time and data */
449     rrd_t     rrd;
450     time_t    current_time = 0;
451     unsigned long current_time_usec = 0;    /* microseconds part of current time */
452     char    **updvals;
453     int       schedule_smooth = 0;
454
455     /* number of elapsed PDP steps since last update */
456     unsigned long *rra_step_cnt = NULL;
457
458     int       version;  /* rrd version */
459     rrd_file_t *rrd_file;
460     char     *arg_copy; /* for processing the argv */
461     unsigned long *skip_update; /* RRAs to advance but not write */
462
463     /* need at least 1 arguments: data. */
464     if (argc < 1) {
465         rrd_set_error("Not enough arguments");
466         goto err_out;
467     }
468
469     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
470         goto err_free;
471     }
472     /* We are now at the beginning of the rra's */
473     rra_current = rra_begin = rrd_file->header_len;
474
475     version = atoi(rrd.stat_head->version);
476
477     initialize_time(&current_time, &current_time_usec, version);
478
479     /* get exclusive lock to whole file.
480      * lock gets removed when we close the file.
481      */
482     if (rrd_lock(rrd_file) != 0) {
483         rrd_set_error("could not lock RRD");
484         goto err_close;
485     }
486
487     if (allocate_data_structures(&rrd, &updvals,
488                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
489                                  &rra_step_cnt, &skip_update,
490                                  &pdp_new) == -1) {
491         goto err_close;
492     }
493
494     /* loop through the arguments. */
495     for (arg_i = 0; arg_i < argc; arg_i++) {
496         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
497             rrd_set_error("failed duplication argv entry");
498             break;
499         }
500         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
501                         &current_time, &current_time_usec, pdp_temp, pdp_new,
502                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
503                         &pcdp_summary, version, skip_update,
504                         &schedule_smooth) == -1) {
505             if (rrd_test_error()) { /* Should have error string always here */
506                 char     *save_error;
507
508                 /* Prepend file name to error message */
509                 if ((save_error = strdup(rrd_get_error())) != NULL) {
510                     rrd_set_error("%s: %s", filename, save_error);
511                     free(save_error);
512                 }
513             }
514             free(arg_copy);
515             break;
516         }
517         free(arg_copy);
518     }
519
520     free(rra_step_cnt);
521
522     /* if we got here and if there is an error and if the file has not been
523      * written to, then close things up and return. */
524     if (rrd_test_error()) {
525         goto err_free_structures;
526     }
527 #ifndef HAVE_MMAP
528     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
529         goto err_free_structures;
530     }
531 #endif
532
533     /* calling the smoothing code here guarantees at most one smoothing
534      * operation per rrd_update call. Unfortunately, it is possible with bulk
535      * updates, or a long-delayed update for smoothing to occur off-schedule.
536      * This really isn't critical except during the burn-in cycles. */
537     if (schedule_smooth) {
538         smooth_all_rras(&rrd, rrd_file, rra_begin);
539     }
540
541 /*    rrd_dontneed(rrd_file,&rrd); */
542     rrd_free(&rrd);
543     rrd_close(rrd_file);
544
545     free(pdp_new);
546     free(tmpl_idx);
547     free(pdp_temp);
548     free(skip_update);
549     free(updvals);
550     return 0;
551
552   err_free_structures:
553     free(pdp_new);
554     free(tmpl_idx);
555     free(pdp_temp);
556     free(skip_update);
557     free(updvals);
558   err_close:
559     rrd_close(rrd_file);
560   err_free:
561     rrd_free(&rrd);
562   err_out:
563     return -1;
564 }
565
566 /*
567  * get exclusive lock to whole file.
568  * lock gets removed when we close the file
569  *
570  * returns 0 on success
571  */
572 int rrd_lock(
573     rrd_file_t *file)
574 {
575     int       rcstat;
576
577     {
578 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
579         struct _stat st;
580
581         if (_fstat(file->fd, &st) == 0) {
582             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
583         } else {
584             rcstat = -1;
585         }
586 #else
587         struct flock lock;
588
589         lock.l_type = F_WRLCK;  /* exclusive write lock */
590         lock.l_len = 0; /* whole file */
591         lock.l_start = 0;   /* start of file */
592         lock.l_whence = SEEK_SET;   /* end of file */
593
594         rcstat = fcntl(file->fd, F_SETLK, &lock);
595 #endif
596     }
597
598     return (rcstat);
599 }
600
601 /*
602  * Allocate some important arrays used, and initialize the template.
603  *
604  * When it returns, either all of the structures are allocated
605  * or none of them are.
606  *
607  * Returns 0 on success, -1 on error.
608  */
609 static int allocate_data_structures(
610     rrd_t *rrd,
611     char ***updvals,
612     rrd_value_t **pdp_temp,
613     const char *tmplt,
614     long **tmpl_idx,
615     unsigned long *tmpl_cnt,
616     unsigned long **rra_step_cnt,
617     unsigned long **skip_update,
618     rrd_value_t **pdp_new)
619 {
620     unsigned  i, ii;
621     if ((*updvals = (char **) malloc(sizeof(char *)
622                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
623         rrd_set_error("allocating updvals pointer array.");
624         return -1;
625     }
626     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
627                                             * rrd->stat_head->ds_cnt)) ==
628         NULL) {
629         rrd_set_error("allocating pdp_temp.");
630         goto err_free_updvals;
631     }
632     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
633                                                  *
634                                                  rrd->stat_head->rra_cnt)) ==
635         NULL) {
636         rrd_set_error("allocating skip_update.");
637         goto err_free_pdp_temp;
638     }
639     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
640                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
641         rrd_set_error("allocating tmpl_idx.");
642         goto err_free_skip_update;
643     }
644     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
645                                                   *
646                                                   (rrd->stat_head->
647                                                    rra_cnt))) == NULL) {
648         rrd_set_error("allocating rra_step_cnt.");
649         goto err_free_tmpl_idx;
650     }
651
652     /* initialize tmplt redirector */
653     /* default config example (assume DS 1 is a CDEF DS)
654        tmpl_idx[0] -> 0; (time)
655        tmpl_idx[1] -> 1; (DS 0)
656        tmpl_idx[2] -> 3; (DS 2)
657        tmpl_idx[3] -> 4; (DS 3) */
658     (*tmpl_idx)[0] = 0; /* time */
659     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
660         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
661             (*tmpl_idx)[ii++] = i;
662     }
663     *tmpl_cnt = ii;
664
665     if (tmplt != NULL) {
666         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
667             goto err_free_rra_step_cnt;
668         }
669     }
670
671     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
672                                            * rrd->stat_head->ds_cnt)) == NULL) {
673         rrd_set_error("allocating pdp_new.");
674         goto err_free_rra_step_cnt;
675     }
676
677     return 0;
678
679   err_free_rra_step_cnt:
680     free(*rra_step_cnt);
681   err_free_tmpl_idx:
682     free(*tmpl_idx);
683   err_free_skip_update:
684     free(*skip_update);
685   err_free_pdp_temp:
686     free(*pdp_temp);
687   err_free_updvals:
688     free(*updvals);
689     return -1;
690 }
691
692 /*
693  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
694  *
695  * Returns 0 on success.
696  */
697 static int parse_template(
698     rrd_t *rrd,
699     const char *tmplt,
700     unsigned long *tmpl_cnt,
701     long *tmpl_idx)
702 {
703     char     *dsname, *tmplt_copy;
704     unsigned int tmpl_len, i;
705     int       ret = 0;
706
707     *tmpl_cnt = 1;      /* the first entry is the time */
708
709     /* we should work on a writeable copy here */
710     if ((tmplt_copy = strdup(tmplt)) == NULL) {
711         rrd_set_error("error copying tmplt '%s'", tmplt);
712         ret = -1;
713         goto out;
714     }
715
716     dsname = tmplt_copy;
717     tmpl_len = strlen(tmplt_copy);
718     for (i = 0; i <= tmpl_len; i++) {
719         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
720             tmplt_copy[i] = '\0';
721             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
722                 rrd_set_error("tmplt contains more DS definitions than RRD");
723                 ret = -1;
724                 goto out_free_tmpl_copy;
725             }
726             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
727                 rrd_set_error("unknown DS name '%s'", dsname);
728                 ret = -1;
729                 goto out_free_tmpl_copy;
730             }
731             /* go to the next entry on the tmplt_copy */
732             if (i < tmpl_len)
733                 dsname = &tmplt_copy[i + 1];
734         }
735     }
736   out_free_tmpl_copy:
737     free(tmplt_copy);
738   out:
739     return ret;
740 }
741
742 /*
743  * Parse an update string, updates the primary data points (PDPs)
744  * and consolidated data points (CDPs), and writes changes to the RRAs.
745  *
746  * Returns 0 on success, -1 on error.
747  */
748 static int process_arg(
749     char *step_start,
750     rrd_t *rrd,
751     rrd_file_t *rrd_file,
752     unsigned long rra_begin,
753     unsigned long *rra_current,
754     time_t *current_time,
755     unsigned long *current_time_usec,
756     rrd_value_t *pdp_temp,
757     rrd_value_t *pdp_new,
758     unsigned long *rra_step_cnt,
759     char **updvals,
760     long *tmpl_idx,
761     unsigned long tmpl_cnt,
762     rrd_info_t ** pcdp_summary,
763     int version,
764     unsigned long *skip_update,
765     int *schedule_smooth)
766 {
767     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
768
769     /* a vector of future Holt-Winters seasonal coefs */
770     unsigned long elapsed_pdp_st;
771
772     double    interval, pre_int, post_int;  /* interval between this and
773                                              * the last run */
774     unsigned long proc_pdp_cnt;
775     unsigned long rra_start;
776
777     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
778                  current_time, current_time_usec, version) == -1) {
779         return -1;
780     }
781     /* seek to the beginning of the rra's */
782     if (*rra_current != rra_begin) {
783         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
784             rrd_set_error("seek error in rrd");
785             return -1;
786         }
787         *rra_current = rra_begin;
788     }
789     rra_start = rra_begin;
790
791     interval = (double) (*current_time - rrd->live_head->last_up)
792         + (double) ((long) *current_time_usec -
793                     (long) rrd->live_head->last_up_usec) / 1e6f;
794
795     /* process the data sources and update the pdp_prep 
796      * area accordingly */
797     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
798         return -1;
799     }
800
801     elapsed_pdp_st = calculate_elapsed_steps(rrd,
802                                              *current_time,
803                                              *current_time_usec, interval,
804                                              &pre_int, &post_int,
805                                              &proc_pdp_cnt);
806
807     /* has a pdp_st moment occurred since the last run ? */
808     if (elapsed_pdp_st == 0) {
809         /* no we have not passed a pdp_st moment. therefore update is simple */
810         simple_update(rrd, interval, pdp_new);
811     } else {
812         /* an pdp_st has occurred. */
813         if (process_all_pdp_st(rrd, interval,
814                                pre_int, post_int,
815                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
816             return -1;
817         }
818         if (update_all_cdp_prep(rrd, rra_step_cnt,
819                                 rra_begin, rrd_file,
820                                 elapsed_pdp_st,
821                                 proc_pdp_cnt,
822                                 &last_seasonal_coef,
823                                 &seasonal_coef,
824                                 pdp_temp, rra_current,
825                                 skip_update, schedule_smooth) == -1) {
826             goto err_free_coefficients;
827         }
828         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
829                                  elapsed_pdp_st, pdp_temp,
830                                  &seasonal_coef) == -1) {
831             goto err_free_coefficients;
832         }
833         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
834                           rra_current, *current_time, skip_update,
835                           pcdp_summary) == -1) {
836             goto err_free_coefficients;
837         }
838     }                   /* endif a pdp_st has occurred */
839     rrd->live_head->last_up = *current_time;
840     rrd->live_head->last_up_usec = *current_time_usec;
841
842     if (version < 3) {
843         *rrd->legacy_last_up = rrd->live_head->last_up;
844     }
845     free(seasonal_coef);
846     free(last_seasonal_coef);
847     return 0;
848
849   err_free_coefficients:
850     free(seasonal_coef);
851     free(last_seasonal_coef);
852     return -1;
853 }
854
855 /*
856  * Parse a DS string (time + colon-separated values), storing the
857  * results in current_time, current_time_usec, and updvals.
858  *
859  * Returns 0 on success, -1 on error.
860  */
861 static int parse_ds(
862     rrd_t *rrd,
863     char **updvals,
864     long *tmpl_idx,
865     char *input,
866     unsigned long tmpl_cnt,
867     time_t *current_time,
868     unsigned long *current_time_usec,
869     int version)
870 {
871     char     *p;
872     unsigned long i;
873     char      timesyntax;
874
875     updvals[0] = input;
876     /* initialize all ds input to unknown except the first one
877        which has always got to be set */
878     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
879         updvals[i] = "U";
880
881     /* separate all ds elements; first must be examined separately
882        due to alternate time syntax */
883     if ((p = strchr(input, '@')) != NULL) {
884         timesyntax = '@';
885     } else if ((p = strchr(input, ':')) != NULL) {
886         timesyntax = ':';
887     } else {
888         rrd_set_error("expected timestamp not found in data source from %s",
889                       input);
890         return -1;
891     }
892     *p = '\0';
893     i = 1;
894     updvals[tmpl_idx[i++]] = p + 1;
895     while (*(++p)) {
896         if (*p == ':') {
897             *p = '\0';
898             if (i < tmpl_cnt) {
899                 updvals[tmpl_idx[i++]] = p + 1;
900             }
901         }
902     }
903
904     if (i != tmpl_cnt) {
905         rrd_set_error("expected %lu data source readings (got %lu) from %s",
906                       tmpl_cnt - 1, i, input);
907         return -1;
908     }
909
910     if (get_time_from_reading(rrd, timesyntax, updvals,
911                               current_time, current_time_usec,
912                               version) == -1) {
913         return -1;
914     }
915     return 0;
916 }
917
918 /*
919  * Parse the time in a DS string, store it in current_time and 
920  * current_time_usec and verify that it's later than the last
921  * update for this DS.
922  *
923  * Returns 0 on success, -1 on error.
924  */
925 static int get_time_from_reading(
926     rrd_t *rrd,
927     char timesyntax,
928     char **updvals,
929     time_t *current_time,
930     unsigned long *current_time_usec,
931     int version)
932 {
933     double    tmp;
934     char     *parsetime_error = NULL;
935     char     *old_locale;
936     rrd_time_value_t ds_tv;
937     struct timeval tmp_time;    /* used for time conversion */
938
939     /* get the time from the reading ... handle N */
940     if (timesyntax == '@') {    /* at-style */
941         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
942             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
943             return -1;
944         }
945         if (ds_tv.type == RELATIVE_TO_END_TIME ||
946             ds_tv.type == RELATIVE_TO_START_TIME) {
947             rrd_set_error("specifying time relative to the 'start' "
948                           "or 'end' makes no sense here: %s", updvals[0]);
949             return -1;
950         }
951         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
952         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
953     } else if (strcmp(updvals[0], "N") == 0) {
954         gettimeofday(&tmp_time, 0);
955         normalize_time(&tmp_time);
956         *current_time = tmp_time.tv_sec;
957         *current_time_usec = tmp_time.tv_usec;
958     } else {
959         old_locale = setlocale(LC_NUMERIC, "C");
960         tmp = strtod(updvals[0], 0);
961         setlocale(LC_NUMERIC, old_locale);
962         *current_time = floor(tmp);
963         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
964     }
965     /* dont do any correction for old version RRDs */
966     if (version < 3)
967         *current_time_usec = 0;
968
969     if (*current_time < rrd->live_head->last_up ||
970         (*current_time == rrd->live_head->last_up &&
971          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
972         rrd_set_error("illegal attempt to update using time %ld when "
973                       "last update time is %ld (minimum one second step)",
974                       *current_time, rrd->live_head->last_up);
975         return -1;
976     }
977     return 0;
978 }
979
980 /*
981  * Update pdp_new by interpreting the updvals according to the DS type
982  * (COUNTER, GAUGE, etc.).
983  *
984  * Returns 0 on success, -1 on error.
985  */
986 static int update_pdp_prep(
987     rrd_t *rrd,
988     char **updvals,
989     rrd_value_t *pdp_new,
990     double interval)
991 {
992     unsigned long ds_idx;
993     int       ii;
994     char     *endptr;   /* used in the conversion */
995     double    rate;
996     char     *old_locale;
997     enum dst_en dst_idx;
998
999     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1000         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1001
1002         /* make sure we do not build diffs with old last_ds values */
1003         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1004             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1005             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1006         }
1007
1008         /* NOTE: DST_CDEF should never enter this if block, because
1009          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1010          * accidently specified a value for the DST_CDEF. To handle this case,
1011          * an extra check is required. */
1012
1013         if ((updvals[ds_idx + 1][0] != 'U') &&
1014             (dst_idx != DST_CDEF) &&
1015             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1016             rate = DNAN;
1017
1018             /* pdp_new contains rate * time ... eg the bytes transferred during
1019              * the interval. Doing it this way saves a lot of math operations
1020              */
1021             switch (dst_idx) {
1022             case DST_COUNTER:
1023             case DST_DERIVE:
1024                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1025                     if ((updvals[ds_idx + 1][ii] < '0'
1026                          || updvals[ds_idx + 1][ii] > '9')
1027                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1028                         rrd_set_error("not a simple integer: '%s'",
1029                                       updvals[ds_idx + 1]);
1030                         return -1;
1031                     }
1032                 }
1033                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1034                     pdp_new[ds_idx] =
1035                         rrd_diff(updvals[ds_idx + 1],
1036                                  rrd->pdp_prep[ds_idx].last_ds);
1037                     if (dst_idx == DST_COUNTER) {
1038                         /* simple overflow catcher. This will fail
1039                          * terribly for non 32 or 64 bit counters
1040                          * ... are there any others in SNMP land?
1041                          */
1042                         if (pdp_new[ds_idx] < (double) 0.0)
1043                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1044                         if (pdp_new[ds_idx] < (double) 0.0)
1045                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1046                     }
1047                     rate = pdp_new[ds_idx] / interval;
1048                 } else {
1049                     pdp_new[ds_idx] = DNAN;
1050                 }
1051                 break;
1052             case DST_ABSOLUTE:
1053                 old_locale = setlocale(LC_NUMERIC, "C");
1054                 errno = 0;
1055                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1056                 setlocale(LC_NUMERIC, old_locale);
1057                 if (errno > 0) {
1058                     rrd_set_error("converting '%s' to float: %s",
1059                                   updvals[ds_idx + 1], rrd_strerror(errno));
1060                     return -1;
1061                 };
1062                 if (endptr[0] != '\0') {
1063                     rrd_set_error
1064                         ("conversion of '%s' to float not complete: tail '%s'",
1065                          updvals[ds_idx + 1], endptr);
1066                     return -1;
1067                 }
1068                 rate = pdp_new[ds_idx] / interval;
1069                 break;
1070             case DST_GAUGE:
1071                 errno = 0;
1072                 old_locale = setlocale(LC_NUMERIC, "C");
1073                 pdp_new[ds_idx] =
1074                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1075                 setlocale(LC_NUMERIC, old_locale);
1076                 if (errno) {
1077                     rrd_set_error("converting '%s' to float: %s",
1078                                   updvals[ds_idx + 1], rrd_strerror(errno));
1079                     return -1;
1080                 };
1081                 if (endptr[0] != '\0') {
1082                     rrd_set_error
1083                         ("conversion of '%s' to float not complete: tail '%s'",
1084                          updvals[ds_idx + 1], endptr);
1085                     return -1;
1086                 }
1087                 rate = pdp_new[ds_idx] / interval;
1088                 break;
1089             default:
1090                 rrd_set_error("rrd contains unknown DS type : '%s'",
1091                               rrd->ds_def[ds_idx].dst);
1092                 return -1;
1093             }
1094             /* break out of this for loop if the error string is set */
1095             if (rrd_test_error()) {
1096                 return -1;
1097             }
1098             /* make sure pdp_temp is neither too large or too small
1099              * if any of these occur it becomes unknown ...
1100              * sorry folks ... */
1101             if (!isnan(rate) &&
1102                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1103                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1104                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1105                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1106                 pdp_new[ds_idx] = DNAN;
1107             }
1108         } else {
1109             /* no news is news all the same */
1110             pdp_new[ds_idx] = DNAN;
1111         }
1112
1113
1114         /* make a copy of the command line argument for the next run */
1115 #ifdef DEBUG
1116         fprintf(stderr, "prep ds[%lu]\t"
1117                 "last_arg '%s'\t"
1118                 "this_arg '%s'\t"
1119                 "pdp_new %10.2f\n",
1120                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1121                 pdp_new[ds_idx]);
1122 #endif
1123         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1124                 LAST_DS_LEN - 1);
1125         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1126     }
1127     return 0;
1128 }
1129
1130 /*
1131  * How many PDP steps have elapsed since the last update? Returns the answer,
1132  * and stores the time between the last update and the last PDP in pre_time,
1133  * and the time between the last PDP and the current time in post_int.
1134  */
1135 static int calculate_elapsed_steps(
1136     rrd_t *rrd,
1137     unsigned long current_time,
1138     unsigned long current_time_usec,
1139     double interval,
1140     double *pre_int,
1141     double *post_int,
1142     unsigned long *proc_pdp_cnt)
1143 {
1144     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1145     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1146                                  * time */
1147     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1148                                  * when it was last updated */
1149     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1150
1151     /* when was the current pdp started */
1152     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1153     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1154
1155     /* when did the last pdp_st occur */
1156     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1157     occu_pdp_st = current_time - occu_pdp_age;
1158
1159     if (occu_pdp_st > proc_pdp_st) {
1160         /* OK we passed the pdp_st moment */
1161         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1162                                                                      * occurred before the latest
1163                                                                      * pdp_st moment*/
1164         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1165         *post_int = occu_pdp_age;   /* how much after it */
1166         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1167     } else {
1168         *pre_int = interval;
1169         *post_int = 0;
1170     }
1171
1172     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1173
1174 #ifdef DEBUG
1175     printf("proc_pdp_age %lu\t"
1176            "proc_pdp_st %lu\t"
1177            "occu_pfp_age %lu\t"
1178            "occu_pdp_st %lu\t"
1179            "int %lf\t"
1180            "pre_int %lf\t"
1181            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1182            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1183 #endif
1184
1185     /* compute the number of elapsed pdp_st moments */
1186     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1187 }
1188
1189 /*
1190  * Increment the PDP values by the values in pdp_new, or else initialize them.
1191  */
1192 static void simple_update(
1193     rrd_t *rrd,
1194     double interval,
1195     rrd_value_t *pdp_new)
1196 {
1197     int       i;
1198
1199     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1200         if (isnan(pdp_new[i])) {
1201             /* this is not really accurate if we use subsecond data arrival time
1202                should have thought of it when going subsecond resolution ...
1203                sorry next format change we will have it! */
1204             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1205                 floor(interval);
1206         } else {
1207             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1208                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1209             } else {
1210                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1211             }
1212         }
1213 #ifdef DEBUG
1214         fprintf(stderr,
1215                 "NO PDP  ds[%i]\t"
1216                 "value %10.2f\t"
1217                 "unkn_sec %5lu\n",
1218                 i,
1219                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1220                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1221 #endif
1222     }
1223 }
1224
1225 /*
1226  * Call process_pdp_st for each DS.
1227  *
1228  * Returns 0 on success, -1 on error.
1229  */
1230 static int process_all_pdp_st(
1231     rrd_t *rrd,
1232     double interval,
1233     double pre_int,
1234     double post_int,
1235     unsigned long elapsed_pdp_st,
1236     rrd_value_t *pdp_new,
1237     rrd_value_t *pdp_temp)
1238 {
1239     unsigned long ds_idx;
1240
1241     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1242        rate*seconds which occurred up to the last run.
1243        pdp_new[] contains rate*seconds from the latest run.
1244        pdp_temp[] will contain the rate for cdp */
1245
1246     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1247         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1248                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1249                            pdp_new, pdp_temp) == -1) {
1250             return -1;
1251         }
1252 #ifdef DEBUG
1253         fprintf(stderr, "PDP UPD ds[%lu]\t"
1254                 "elapsed_pdp_st %lu\t"
1255                 "pdp_temp %10.2f\t"
1256                 "new_prep %10.2f\t"
1257                 "new_unkn_sec %5lu\n",
1258                 ds_idx,
1259                 elapsed_pdp_st,
1260                 pdp_temp[ds_idx],
1261                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1262                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1263 #endif
1264     }
1265     return 0;
1266 }
1267
1268 /*
1269  * Process an update that occurs after one of the PDP moments.
1270  * Increments the PDP value, sets NAN if time greater than the
1271  * heartbeats have elapsed, processes CDEFs.
1272  *
1273  * Returns 0 on success, -1 on error.
1274  */
1275 static int process_pdp_st(
1276     rrd_t *rrd,
1277     unsigned long ds_idx,
1278     double interval,
1279     double pre_int,
1280     double post_int,
1281     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1282     rrd_value_t *pdp_new,
1283     rrd_value_t *pdp_temp)
1284 {
1285     int       i;
1286
1287     /* update pdp_prep to the current pdp_st. */
1288     double    pre_unknown = 0.0;
1289     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1290     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1291
1292     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1293
1294     rpnstack_init(&rpnstack);
1295
1296
1297     if (isnan(pdp_new[ds_idx])) {
1298         /* a final bit of unknown to be added before calculation
1299            we use a temporary variable for this so that we
1300            don't have to turn integer lines before using the value */
1301         pre_unknown = pre_int;
1302     } else {
1303         if (isnan(scratch[PDP_val].u_val)) {
1304             scratch[PDP_val].u_val = 0;
1305         }
1306         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1307     }
1308
1309     /* if too much of the pdp_prep is unknown we dump it */
1310     /* if the interval is larger thatn mrhb we get NAN */
1311     if ((interval > mrhb) ||
1312         (rrd->stat_head->pdp_step / 2.0 <
1313          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1314         pdp_temp[ds_idx] = DNAN;
1315     } else {
1316         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1317             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1318              pre_unknown);
1319     }
1320
1321     /* process CDEF data sources; remember each CDEF DS can
1322      * only reference other DS with a lower index number */
1323     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1324         rpnp_t   *rpnp;
1325
1326         rpnp =
1327             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1328         /* substitute data values for OP_VARIABLE nodes */
1329         for (i = 0; rpnp[i].op != OP_END; i++) {
1330             if (rpnp[i].op == OP_VARIABLE) {
1331                 rpnp[i].op = OP_NUMBER;
1332                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1333             }
1334         }
1335         /* run the rpn calculator */
1336         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1337             free(rpnp);
1338             rpnstack_free(&rpnstack);
1339             return -1;
1340         }
1341     }
1342
1343     /* make pdp_prep ready for the next run */
1344     if (isnan(pdp_new[ds_idx])) {
1345         /* this is not realy accurate if we use subsecond data arival time
1346            should have thought of it when going subsecond resolution ...
1347            sorry next format change we will have it! */
1348         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1349         scratch[PDP_val].u_val = DNAN;
1350     } else {
1351         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1352         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1353     }
1354     rpnstack_free(&rpnstack);
1355     return 0;
1356 }
1357
1358 /*
1359  * Iterate over all the RRAs for a given DS and:
1360  * 1. Decide whether to schedule a smooth later
1361  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1362  * 3. Update the CDP
1363  *
1364  * Returns 0 on success, -1 on error
1365  */
1366 static int update_all_cdp_prep(
1367     rrd_t *rrd,
1368     unsigned long *rra_step_cnt,
1369     unsigned long rra_begin,
1370     rrd_file_t *rrd_file,
1371     unsigned long elapsed_pdp_st,
1372     unsigned long proc_pdp_cnt,
1373     rrd_value_t **last_seasonal_coef,
1374     rrd_value_t **seasonal_coef,
1375     rrd_value_t *pdp_temp,
1376     unsigned long *rra_current,
1377     unsigned long *skip_update,
1378     int *schedule_smooth)
1379 {
1380     unsigned long rra_idx;
1381
1382     /* index into the CDP scratch array */
1383     enum cf_en current_cf;
1384     unsigned long rra_start;
1385
1386     /* number of rows to be updated in an RRA for a data value. */
1387     unsigned long start_pdp_offset;
1388
1389     rra_start = rra_begin;
1390     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1391         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1392         start_pdp_offset =
1393             rrd->rra_def[rra_idx].pdp_cnt -
1394             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1395         skip_update[rra_idx] = 0;
1396         if (start_pdp_offset <= elapsed_pdp_st) {
1397             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1398                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1399         } else {
1400             rra_step_cnt[rra_idx] = 0;
1401         }
1402
1403         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1404             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1405              * so that they will be correct for the next observed value; note that for
1406              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1407              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1408             if (rra_step_cnt[rra_idx] > 1) {
1409                 skip_update[rra_idx] = 1;
1410                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1411                                 elapsed_pdp_st, last_seasonal_coef);
1412                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1413                                 elapsed_pdp_st + 1, seasonal_coef);
1414             }
1415             /* periodically run a smoother for seasonal effects */
1416             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1417 #ifdef DEBUG
1418                 fprintf(stderr,
1419                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1420                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1421                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1422                         u_cnt);
1423 #endif
1424                 *schedule_smooth = 1;
1425             }
1426             *rra_current = rrd_tell(rrd_file);
1427         }
1428         if (rrd_test_error())
1429             return -1;
1430
1431         if (update_cdp_prep
1432             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1433              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1434              current_cf) == -1) {
1435             return -1;
1436         }
1437         rra_start +=
1438             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1439             sizeof(rrd_value_t);
1440     }
1441     return 0;
1442 }
1443
1444 /* 
1445  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1446  */
1447 static int do_schedule_smooth(
1448     rrd_t *rrd,
1449     unsigned long rra_idx,
1450     unsigned long elapsed_pdp_st)
1451 {
1452     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1453     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1454     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1455     unsigned long seasonal_smooth_idx =
1456         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1457     unsigned long *init_seasonal =
1458         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1459
1460     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1461      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1462      * really an RRA level, not a data source within RRA level parameter, but
1463      * the rra_def is read only for rrd_update (not flushed to disk). */
1464     if (*init_seasonal > BURNIN_CYCLES) {
1465         /* someone has no doubt invented a trick to deal with this wrap around,
1466          * but at least this code is clear. */
1467         if (seasonal_smooth_idx > cur_row) {
1468             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1469              * between PDP and CDP */
1470             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1471         }
1472         /* can't rely on negative numbers because we are working with
1473          * unsigned values */
1474         return (cur_row + elapsed_pdp_st >= row_cnt
1475                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1476     }
1477     /* mark off one of the burn-in cycles */
1478     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1479 }
1480
1481 /*
1482  * For a given RRA, iterate over the data sources and call the appropriate
1483  * consolidation function.
1484  *
1485  * Returns 0 on success, -1 on error.
1486  */
1487 static int update_cdp_prep(
1488     rrd_t *rrd,
1489     unsigned long elapsed_pdp_st,
1490     unsigned long start_pdp_offset,
1491     unsigned long *rra_step_cnt,
1492     int rra_idx,
1493     rrd_value_t *pdp_temp,
1494     rrd_value_t *last_seasonal_coef,
1495     rrd_value_t *seasonal_coef,
1496     int current_cf)
1497 {
1498     unsigned long ds_idx, cdp_idx;
1499
1500     /* update CDP_PREP areas */
1501     /* loop over data soures within each RRA */
1502     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1503
1504         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1505
1506         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1507             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1508                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1509                        elapsed_pdp_st, start_pdp_offset,
1510                        rrd->rra_def[rra_idx].pdp_cnt,
1511                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1512                        rra_idx, ds_idx);
1513         } else {
1514             /* Nothing to consolidate if there's one PDP per CDP. However, if
1515              * we've missed some PDPs, let's update null counters etc. */
1516             if (elapsed_pdp_st > 2) {
1517                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1518                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1519                           current_cf);
1520             }
1521         }
1522
1523         if (rrd_test_error())
1524             return -1;
1525     }                   /* endif data sources loop */
1526     return 0;
1527 }
1528
1529 /*
1530  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1531  * primary value, secondary value, and # of unknowns.
1532  */
1533 static void update_cdp(
1534     unival *scratch,
1535     int current_cf,
1536     rrd_value_t pdp_temp_val,
1537     unsigned long rra_step_cnt,
1538     unsigned long elapsed_pdp_st,
1539     unsigned long start_pdp_offset,
1540     unsigned long pdp_cnt,
1541     rrd_value_t xff,
1542     int i,
1543     int ii)
1544 {
1545     /* shorthand variables */
1546     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1547     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1548     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1549     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1550
1551     if (rra_step_cnt) {
1552         /* If we are in this block, as least 1 CDP value will be written to
1553          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1554          * to be written, then the "fill in" value is the CDP_secondary_val
1555          * entry. */
1556         if (isnan(pdp_temp_val)) {
1557             *cdp_unkn_pdp_cnt += start_pdp_offset;
1558             *cdp_secondary_val = DNAN;
1559         } else {
1560             /* CDP_secondary value is the RRA "fill in" value for intermediary
1561              * CDP data entries. No matter the CF, the value is the same because
1562              * the average, max, min, and last of a list of identical values is
1563              * the same, namely, the value itself. */
1564             *cdp_secondary_val = pdp_temp_val;
1565         }
1566
1567         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1568             *cdp_primary_val = DNAN;
1569             if (current_cf == CF_AVERAGE) {
1570                 *cdp_val =
1571                     initialize_average_carry_over(pdp_temp_val,
1572                                                   elapsed_pdp_st,
1573                                                   start_pdp_offset, pdp_cnt);
1574             } else {
1575                 *cdp_val = pdp_temp_val;
1576             }
1577         } else {
1578             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1579                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1580         }               /* endif meets xff value requirement for a valid value */
1581         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1582          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1583         if (isnan(pdp_temp_val))
1584             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1585         else
1586             *cdp_unkn_pdp_cnt = 0;
1587     } else {            /* rra_step_cnt[i]  == 0 */
1588
1589 #ifdef DEBUG
1590         if (isnan(*cdp_val)) {
1591             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1592                     i, ii);
1593         } else {
1594             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1595                     i, ii, *cdp_val);
1596         }
1597 #endif
1598         if (isnan(pdp_temp_val)) {
1599             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1600         } else {
1601             *cdp_val =
1602                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1603                                   current_cf, i, ii);
1604         }
1605     }
1606 }
1607
1608 /*
1609  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1610  * on the type of consolidation function.
1611  */
1612 static void initialize_cdp_val(
1613     unival *scratch,
1614     int current_cf,
1615     rrd_value_t pdp_temp_val,
1616     unsigned long elapsed_pdp_st,
1617     unsigned long start_pdp_offset,
1618     unsigned long pdp_cnt)
1619 {
1620     rrd_value_t cum_val, cur_val;
1621
1622     switch (current_cf) {
1623     case CF_AVERAGE:
1624         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1625         cur_val = IFDNAN(pdp_temp_val, 0.0);
1626         scratch[CDP_primary_val].u_val =
1627             (cum_val + cur_val * start_pdp_offset) /
1628             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1629         scratch[CDP_val].u_val =
1630             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1631                                           start_pdp_offset, pdp_cnt);
1632         break;
1633     case CF_MAXIMUM:
1634         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1635         cur_val = IFDNAN(pdp_temp_val, -DINF);
1636 #if 0
1637 #ifdef DEBUG
1638         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1639             fprintf(stderr,
1640                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1641                     i, ii);
1642             exit(-1);
1643         }
1644 #endif
1645 #endif
1646         if (cur_val > cum_val)
1647             scratch[CDP_primary_val].u_val = cur_val;
1648         else
1649             scratch[CDP_primary_val].u_val = cum_val;
1650         /* initialize carry over value */
1651         scratch[CDP_val].u_val = pdp_temp_val;
1652         break;
1653     case CF_MINIMUM:
1654         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1655         cur_val = IFDNAN(pdp_temp_val, DINF);
1656 #if 0
1657 #ifdef DEBUG
1658         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1659             fprintf(stderr,
1660                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1661                     ii);
1662             exit(-1);
1663         }
1664 #endif
1665 #endif
1666         if (cur_val < cum_val)
1667             scratch[CDP_primary_val].u_val = cur_val;
1668         else
1669             scratch[CDP_primary_val].u_val = cum_val;
1670         /* initialize carry over value */
1671         scratch[CDP_val].u_val = pdp_temp_val;
1672         break;
1673     case CF_LAST:
1674     default:
1675         scratch[CDP_primary_val].u_val = pdp_temp_val;
1676         /* initialize carry over value */
1677         scratch[CDP_val].u_val = pdp_temp_val;
1678         break;
1679     }
1680 }
1681
1682 /*
1683  * Update the consolidation function for Holt-Winters functions as
1684  * well as other functions that don't actually consolidate multiple
1685  * PDPs.
1686  */
1687 static void reset_cdp(
1688     rrd_t *rrd,
1689     unsigned long elapsed_pdp_st,
1690     rrd_value_t *pdp_temp,
1691     rrd_value_t *last_seasonal_coef,
1692     rrd_value_t *seasonal_coef,
1693     int rra_idx,
1694     int ds_idx,
1695     int cdp_idx,
1696     enum cf_en current_cf)
1697 {
1698     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1699
1700     switch (current_cf) {
1701     case CF_AVERAGE:
1702     default:
1703         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1704         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1705         break;
1706     case CF_SEASONAL:
1707     case CF_DEVSEASONAL:
1708         /* need to update cached seasonal values, so they are consistent
1709          * with the bulk update */
1710         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1711          * CDP_last_deviation are the same. */
1712         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1713         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1714         break;
1715     case CF_HWPREDICT:
1716     case CF_MHWPREDICT:
1717         /* need to update the null_count and last_null_count.
1718          * even do this for non-DNAN pdp_temp because the
1719          * algorithm is not learning from batch updates. */
1720         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1721         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1722         /* fall through */
1723     case CF_DEVPREDICT:
1724         scratch[CDP_primary_val].u_val = DNAN;
1725         scratch[CDP_secondary_val].u_val = DNAN;
1726         break;
1727     case CF_FAILURES:
1728         /* do not count missed bulk values as failures */
1729         scratch[CDP_primary_val].u_val = 0;
1730         scratch[CDP_secondary_val].u_val = 0;
1731         /* need to reset violations buffer.
1732          * could do this more carefully, but for now, just
1733          * assume a bulk update wipes away all violations. */
1734         erase_violations(rrd, cdp_idx, rra_idx);
1735         break;
1736     }
1737 }
1738
1739 static rrd_value_t initialize_average_carry_over(
1740     rrd_value_t pdp_temp_val,
1741     unsigned long elapsed_pdp_st,
1742     unsigned long start_pdp_offset,
1743     unsigned long pdp_cnt)
1744 {
1745     /* initialize carry over value */
1746     if (isnan(pdp_temp_val)) {
1747         return DNAN;
1748     }
1749     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1750 }
1751
1752 /*
1753  * Update or initialize a CDP value based on the consolidation
1754  * function.
1755  *
1756  * Returns the new value.
1757  */
1758 static rrd_value_t calculate_cdp_val(
1759     rrd_value_t cdp_val,
1760     rrd_value_t pdp_temp_val,
1761     unsigned long elapsed_pdp_st,
1762     int current_cf,
1763 #ifdef DEBUG
1764     int i,
1765     int ii
1766 #else
1767     int UNUSED(i),
1768     int UNUSED(ii)
1769 #endif
1770     )
1771 {
1772     if (isnan(cdp_val)) {
1773         if (current_cf == CF_AVERAGE) {
1774             pdp_temp_val *= elapsed_pdp_st;
1775         }
1776 #ifdef DEBUG
1777         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1778                 i, ii, pdp_temp_val);
1779 #endif
1780         return pdp_temp_val;
1781     }
1782     if (current_cf == CF_AVERAGE)
1783         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1784     if (current_cf == CF_MINIMUM)
1785         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1786     if (current_cf == CF_MAXIMUM)
1787         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1788
1789     return pdp_temp_val;
1790 }
1791
1792 /*
1793  * For each RRA, update the seasonal values and then call update_aberrant_CF
1794  * for each data source.
1795  *
1796  * Return 0 on success, -1 on error.
1797  */
1798 static int update_aberrant_cdps(
1799     rrd_t *rrd,
1800     rrd_file_t *rrd_file,
1801     unsigned long rra_begin,
1802     unsigned long *rra_current,
1803     unsigned long elapsed_pdp_st,
1804     rrd_value_t *pdp_temp,
1805     rrd_value_t **seasonal_coef)
1806 {
1807     unsigned long rra_idx, ds_idx, j;
1808
1809     /* number of PDP steps since the last update that
1810      * are assigned to the first CDP to be generated
1811      * since the last update. */
1812     unsigned short scratch_idx;
1813     unsigned long rra_start;
1814     enum cf_en current_cf;
1815
1816     /* this loop is only entered if elapsed_pdp_st < 3 */
1817     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1818          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1819         rra_start = rra_begin;
1820         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1821             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1822                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1823                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1824                     if (scratch_idx == CDP_primary_val) {
1825                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1826                                         elapsed_pdp_st + 1, seasonal_coef);
1827                     } else {
1828                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1829                                         elapsed_pdp_st + 2, seasonal_coef);
1830                     }
1831                     *rra_current = rrd_tell(rrd_file);
1832                 }
1833                 if (rrd_test_error())
1834                     return -1;
1835                 /* loop over data soures within each RRA */
1836                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1837                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1838                                        rra_idx * (rrd->stat_head->ds_cnt) +
1839                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1840                                        *seasonal_coef);
1841                 }
1842             }
1843             rra_start += rrd->rra_def[rra_idx].row_cnt
1844                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1845         }
1846     }
1847     return 0;
1848 }
1849
1850 /* 
1851  * Move sequentially through the file, writing one RRA at a time.  Note this
1852  * architecture divorces the computation of CDP with flushing updated RRA
1853  * entries to disk.
1854  *
1855  * Return 0 on success, -1 on error.
1856  */
1857 static int write_to_rras(
1858     rrd_t *rrd,
1859     rrd_file_t *rrd_file,
1860     unsigned long *rra_step_cnt,
1861     unsigned long rra_begin,
1862     unsigned long *rra_current,
1863     time_t current_time,
1864     unsigned long *skip_update,
1865     rrd_info_t ** pcdp_summary)
1866 {
1867     unsigned long rra_idx;
1868     unsigned long rra_start;
1869     time_t    rra_time = 0; /* time of update for a RRA */
1870
1871     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1872     
1873     /* Ready to write to disk */
1874     rra_start = rra_begin;
1875
1876     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1877         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1878         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1879
1880         /* for cdp_prep */
1881         unsigned short scratch_idx;
1882         unsigned long step_subtract;
1883
1884         for (scratch_idx = CDP_primary_val,
1885                  step_subtract = 1;
1886              rra_step_cnt[rra_idx] > 0;
1887              rra_step_cnt[rra_idx]--,
1888                  scratch_idx = CDP_secondary_val,
1889                  step_subtract = 2) {
1890
1891             unsigned long rra_pos_new;
1892 #ifdef DEBUG
1893             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1894 #endif
1895             /* increment, with wrap-around */
1896             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1897               rra_ptr->cur_row = 0;
1898
1899             /* we know what our position should be */
1900             rra_pos_new = rra_start
1901               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1902
1903             /* re-seek if the position is wrong or we wrapped around */
1904             if (rra_pos_new != *rra_current || rra_ptr->cur_row == 0) {
1905                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1906                     rrd_set_error("seek error in rrd");
1907                     return -1;
1908                 }
1909                 *rra_current = rra_pos_new;
1910             }
1911 #ifdef DEBUG
1912             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1913 #endif
1914
1915             if (skip_update[rra_idx])
1916                 continue;
1917
1918             if (*pcdp_summary != NULL) {
1919                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1920
1921                 rra_time = (current_time - current_time % step_time)
1922                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1923             }
1924
1925             if (write_RRA_row
1926                 (rrd_file, rrd, rra_idx, rra_current, scratch_idx,
1927                  pcdp_summary, rra_time) == -1)
1928                 return -1;
1929         }
1930
1931         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1932     } /* RRA LOOP */
1933
1934     return 0;
1935 }
1936
1937 /*
1938  * Write out one row of values (one value per DS) to the archive.
1939  *
1940  * Returns 0 on success, -1 on error.
1941  */
1942 static int write_RRA_row(
1943     rrd_file_t *rrd_file,
1944     rrd_t *rrd,
1945     unsigned long rra_idx,
1946     unsigned long *rra_current,
1947     unsigned short CDP_scratch_idx,
1948     rrd_info_t ** pcdp_summary,
1949     time_t rra_time)
1950 {
1951     unsigned long ds_idx, cdp_idx;
1952     rrd_infoval_t iv;
1953
1954     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1955         /* compute the cdp index */
1956         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1957 #ifdef DEBUG
1958         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1959                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1960                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1961 #endif
1962         if (*pcdp_summary != NULL) {
1963             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1964             /* append info to the return hash */
1965             *pcdp_summary = rrd_info_push(*pcdp_summary,
1966                                           sprintf_alloc
1967                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1968                                            rrd->rra_def[rra_idx].cf_nam,
1969                                            rrd->rra_def[rra_idx].pdp_cnt,
1970                                            rrd->ds_def[ds_idx].ds_nam),
1971                                           RD_I_VAL, iv);
1972         }
1973         if (rrd_write(rrd_file,
1974                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1975                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1976             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1977             return -1;
1978         }
1979         *rra_current += sizeof(rrd_value_t);
1980     }
1981     return 0;
1982 }
1983
1984 /*
1985  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1986  *
1987  * Returns 0 on success, -1 otherwise
1988  */
1989 static int smooth_all_rras(
1990     rrd_t *rrd,
1991     rrd_file_t *rrd_file,
1992     unsigned long rra_begin)
1993 {
1994     unsigned long rra_start = rra_begin;
1995     unsigned long rra_idx;
1996
1997     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
1998         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
1999             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2000 #ifdef DEBUG
2001             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2002 #endif
2003             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2004             if (rrd_test_error())
2005                 return -1;
2006         }
2007         rra_start += rrd->rra_def[rra_idx].row_cnt
2008             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2009     }
2010     return 0;
2011 }
2012
2013 #ifndef HAVE_MMAP
2014 /*
2015  * Flush changes to disk (unless we're using mmap)
2016  *
2017  * Returns 0 on success, -1 otherwise
2018  */
2019 static int write_changes_to_disk(
2020     rrd_t *rrd,
2021     rrd_file_t *rrd_file,
2022     int version)
2023 {
2024     /* we just need to write back the live header portion now */
2025     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2026                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2027                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2028                  SEEK_SET) != 0) {
2029         rrd_set_error("seek rrd for live header writeback");
2030         return -1;
2031     }
2032     if (version >= 3) {
2033         if (rrd_write(rrd_file, rrd->live_head,
2034                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2035             rrd_set_error("rrd_write live_head to rrd");
2036             return -1;
2037         }
2038     } else {
2039         if (rrd_write(rrd_file, rrd->legacy_last_up,
2040                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2041             rrd_set_error("rrd_write live_head to rrd");
2042             return -1;
2043         }
2044     }
2045
2046
2047     if (rrd_write(rrd_file, rrd->pdp_prep,
2048                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2049         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2050         rrd_set_error("rrd_write pdp_prep to rrd");
2051         return -1;
2052     }
2053
2054     if (rrd_write(rrd_file, rrd->cdp_prep,
2055                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2056                   rrd->stat_head->ds_cnt)
2057         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2058                       rrd->stat_head->ds_cnt)) {
2059
2060         rrd_set_error("rrd_write cdp_prep to rrd");
2061         return -1;
2062     }
2063
2064     if (rrd_write(rrd_file, rrd->rra_ptr,
2065                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2066         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2067         rrd_set_error("rrd_write rra_ptr to rrd");
2068         return -1;
2069     }
2070     return 0;
2071 }
2072 #endif