added note on %S
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.0  Copyright by Tobi Oetiker, 1997-2008
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
9
10 #include "rrd_tool.h"
11
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
17
18 #include <locale.h>
19
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
22
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
25
26 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 /*
28  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
29  * replacement.
30  */
31 #include <sys/timeb.h>
32
33 #ifndef __MINGW32__
34 struct timeval {
35     time_t    tv_sec;   /* seconds */
36     long      tv_usec;  /* microseconds */
37 };
38 #endif
39
40 struct __timezone {
41     int       tz_minuteswest;   /* minutes W of Greenwich */
42     int       tz_dsttime;   /* type of dst correction */
43 };
44
45 static int gettimeofday(
46     struct timeval *t,
47     struct __timezone *tz)
48 {
49
50     struct _timeb current_time;
51
52     _ftime(&current_time);
53
54     t->tv_sec = current_time.time;
55     t->tv_usec = current_time.millitm * 1000;
56
57     return 0;
58 }
59
60 #endif
61
62 /* FUNCTION PROTOTYPES */
63
64 int       rrd_update_r(
65     const char *filename,
66     const char *tmplt,
67     int argc,
68     const char **argv);
69 int       _rrd_update(
70     const char *filename,
71     const char *tmplt,
72     int argc,
73     const char **argv,
74     rrd_info_t *);
75
76 static int allocate_data_structures(
77     rrd_t *rrd,
78     char ***updvals,
79     rrd_value_t **pdp_temp,
80     const char *tmplt,
81     long **tmpl_idx,
82     unsigned long *tmpl_cnt,
83     unsigned long **rra_step_cnt,
84     unsigned long **skip_update,
85     rrd_value_t **pdp_new);
86
87 static int parse_template(
88     rrd_t *rrd,
89     const char *tmplt,
90     unsigned long *tmpl_cnt,
91     long *tmpl_idx);
92
93 static int process_arg(
94     char *step_start,
95     rrd_t *rrd,
96     rrd_file_t *rrd_file,
97     unsigned long rra_begin,
98     unsigned long *rra_current,
99     time_t *current_time,
100     unsigned long *current_time_usec,
101     rrd_value_t *pdp_temp,
102     rrd_value_t *pdp_new,
103     unsigned long *rra_step_cnt,
104     char **updvals,
105     long *tmpl_idx,
106     unsigned long tmpl_cnt,
107     rrd_info_t ** pcdp_summary,
108     int version,
109     unsigned long *skip_update,
110     int *schedule_smooth);
111
112 static int parse_ds(
113     rrd_t *rrd,
114     char **updvals,
115     long *tmpl_idx,
116     char *input,
117     unsigned long tmpl_cnt,
118     time_t *current_time,
119     unsigned long *current_time_usec,
120     int version);
121
122 static int get_time_from_reading(
123     rrd_t *rrd,
124     char timesyntax,
125     char **updvals,
126     time_t *current_time,
127     unsigned long *current_time_usec,
128     int version);
129
130 static int update_pdp_prep(
131     rrd_t *rrd,
132     char **updvals,
133     rrd_value_t *pdp_new,
134     double interval);
135
136 static int calculate_elapsed_steps(
137     rrd_t *rrd,
138     unsigned long current_time,
139     unsigned long current_time_usec,
140     double interval,
141     double *pre_int,
142     double *post_int,
143     unsigned long *proc_pdp_cnt);
144
145 static void simple_update(
146     rrd_t *rrd,
147     double interval,
148     rrd_value_t *pdp_new);
149
150 static int process_all_pdp_st(
151     rrd_t *rrd,
152     double interval,
153     double pre_int,
154     double post_int,
155     unsigned long elapsed_pdp_st,
156     rrd_value_t *pdp_new,
157     rrd_value_t *pdp_temp);
158
159 static int process_pdp_st(
160     rrd_t *rrd,
161     unsigned long ds_idx,
162     double interval,
163     double pre_int,
164     double post_int,
165     long diff_pdp_st,
166     rrd_value_t *pdp_new,
167     rrd_value_t *pdp_temp);
168
169 static int update_all_cdp_prep(
170     rrd_t *rrd,
171     unsigned long *rra_step_cnt,
172     unsigned long rra_begin,
173     rrd_file_t *rrd_file,
174     unsigned long elapsed_pdp_st,
175     unsigned long proc_pdp_cnt,
176     rrd_value_t **last_seasonal_coef,
177     rrd_value_t **seasonal_coef,
178     rrd_value_t *pdp_temp,
179     unsigned long *rra_current,
180     unsigned long *skip_update,
181     int *schedule_smooth);
182
183 static int do_schedule_smooth(
184     rrd_t *rrd,
185     unsigned long rra_idx,
186     unsigned long elapsed_pdp_st);
187
188 static int update_cdp_prep(
189     rrd_t *rrd,
190     unsigned long elapsed_pdp_st,
191     unsigned long start_pdp_offset,
192     unsigned long *rra_step_cnt,
193     int rra_idx,
194     rrd_value_t *pdp_temp,
195     rrd_value_t *last_seasonal_coef,
196     rrd_value_t *seasonal_coef,
197     int current_cf);
198
199 static void update_cdp(
200     unival *scratch,
201     int current_cf,
202     rrd_value_t pdp_temp_val,
203     unsigned long rra_step_cnt,
204     unsigned long elapsed_pdp_st,
205     unsigned long start_pdp_offset,
206     unsigned long pdp_cnt,
207     rrd_value_t xff,
208     int i,
209     int ii);
210
211 static void initialize_cdp_val(
212     unival *scratch,
213     int current_cf,
214     rrd_value_t pdp_temp_val,
215     unsigned long elapsed_pdp_st,
216     unsigned long start_pdp_offset,
217     unsigned long pdp_cnt);
218
219 static void reset_cdp(
220     rrd_t *rrd,
221     unsigned long elapsed_pdp_st,
222     rrd_value_t *pdp_temp,
223     rrd_value_t *last_seasonal_coef,
224     rrd_value_t *seasonal_coef,
225     int rra_idx,
226     int ds_idx,
227     int cdp_idx,
228     enum cf_en current_cf);
229
230 static rrd_value_t initialize_average_carry_over(
231     rrd_value_t pdp_temp_val,
232     unsigned long elapsed_pdp_st,
233     unsigned long start_pdp_offset,
234     unsigned long pdp_cnt);
235
236 static rrd_value_t calculate_cdp_val(
237     rrd_value_t cdp_val,
238     rrd_value_t pdp_temp_val,
239     unsigned long elapsed_pdp_st,
240     int current_cf,
241     int i,
242     int ii);
243
244 static int update_aberrant_cdps(
245     rrd_t *rrd,
246     rrd_file_t *rrd_file,
247     unsigned long rra_begin,
248     unsigned long *rra_current,
249     unsigned long elapsed_pdp_st,
250     rrd_value_t *pdp_temp,
251     rrd_value_t **seasonal_coef);
252
253 static int write_to_rras(
254     rrd_t *rrd,
255     rrd_file_t *rrd_file,
256     unsigned long *rra_step_cnt,
257     unsigned long rra_begin,
258     unsigned long *rra_current,
259     time_t current_time,
260     unsigned long *skip_update,
261     rrd_info_t ** pcdp_summary);
262
263 static int write_RRA_row(
264     rrd_file_t *rrd_file,
265     rrd_t *rrd,
266     unsigned long rra_idx,
267     unsigned long *rra_current,
268     unsigned short CDP_scratch_idx,
269     rrd_info_t ** pcdp_summary,
270     time_t rra_time);
271
272 static int smooth_all_rras(
273     rrd_t *rrd,
274     rrd_file_t *rrd_file,
275     unsigned long rra_begin);
276
277 #ifndef HAVE_MMAP
278 static int write_changes_to_disk(
279     rrd_t *rrd,
280     rrd_file_t *rrd_file,
281     int version);
282 #endif
283
284 /*
285  * normalize time as returned by gettimeofday. usec part must
286  * be always >= 0
287  */
288 static inline void normalize_time(
289     struct timeval *t)
290 {
291     if (t->tv_usec < 0) {
292         t->tv_sec--;
293         t->tv_usec += 1e6L;
294     }
295 }
296
297 /*
298  * Sets current_time and current_time_usec based on the current time.
299  * current_time_usec is set to 0 if the version number is 1 or 2.
300  */
301 static inline void initialize_time(
302     time_t *current_time,
303     unsigned long *current_time_usec,
304     int version)
305 {
306     struct timeval tmp_time;    /* used for time conversion */
307
308     gettimeofday(&tmp_time, 0);
309     normalize_time(&tmp_time);
310     *current_time = tmp_time.tv_sec;
311     if (version >= 3) {
312         *current_time_usec = tmp_time.tv_usec;
313     } else {
314         *current_time_usec = 0;
315     }
316 }
317
318 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
319
320 rrd_info_t *rrd_update_v(
321     int argc,
322     char **argv)
323 {
324     char     *tmplt = NULL;
325     rrd_info_t *result = NULL;
326     rrd_infoval_t rc;
327     struct option long_options[] = {
328         {"template", required_argument, 0, 't'},
329         {0, 0, 0, 0}
330     };
331
332     rc.u_int = -1;
333     optind = 0;
334     opterr = 0;         /* initialize getopt */
335
336     while (1) {
337         int       option_index = 0;
338         int       opt;
339
340         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
341
342         if (opt == EOF)
343             break;
344
345         switch (opt) {
346         case 't':
347             tmplt = optarg;
348             break;
349
350         case '?':
351             rrd_set_error("unknown option '%s'", argv[optind - 1]);
352             goto end_tag;
353         }
354     }
355
356     /* need at least 2 arguments: filename, data. */
357     if (argc - optind < 2) {
358         rrd_set_error("Not enough arguments");
359         goto end_tag;
360     }
361     rc.u_int = 0;
362     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
363     rc.u_int = _rrd_update(argv[optind], tmplt,
364                            argc - optind - 1,
365                            (const char **) (argv + optind + 1), result);
366     result->value.u_int = rc.u_int;
367   end_tag:
368     return result;
369 }
370
371 int rrd_update(
372     int argc,
373     char **argv)
374 {
375     struct option long_options[] = {
376         {"template", required_argument, 0, 't'},
377         {0, 0, 0, 0}
378     };
379     int       option_index = 0;
380     int       opt;
381     char     *tmplt = NULL;
382     int       rc = -1;
383
384     optind = 0;
385     opterr = 0;         /* initialize getopt */
386
387     while (1) {
388         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
389
390         if (opt == EOF)
391             break;
392
393         switch (opt) {
394         case 't':
395             tmplt = strdup(optarg);
396             break;
397
398         case '?':
399             rrd_set_error("unknown option '%s'", argv[optind - 1]);
400             goto out;
401         }
402     }
403
404     /* need at least 2 arguments: filename, data. */
405     if (argc - optind < 2) {
406         rrd_set_error("Not enough arguments");
407         goto out;
408     }
409
410     rc = rrd_update_r(argv[optind], tmplt,
411                       argc - optind - 1, (const char **) (argv + optind + 1));
412   out:
413     free(tmplt);
414     return rc;
415 }
416
417 int rrd_update_r(
418     const char *filename,
419     const char *tmplt,
420     int argc,
421     const char **argv)
422 {
423     return _rrd_update(filename, tmplt, argc, argv, NULL);
424 }
425
426 int _rrd_update(
427     const char *filename,
428     const char *tmplt,
429     int argc,
430     const char **argv,
431     rrd_info_t * pcdp_summary)
432 {
433
434     int       arg_i = 2;
435
436     unsigned long rra_begin;    /* byte pointer to the rra
437                                  * area in the rrd file.  this
438                                  * pointer never changes value */
439     unsigned long rra_current;  /* byte pointer to the current write
440                                  * spot in the rrd file. */
441     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
442                              * to the existing entry */
443     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
444                              * to the cdp values */
445
446     long     *tmpl_idx; /* index representing the settings
447                          * transported by the tmplt index */
448     unsigned long tmpl_cnt = 2; /* time and data */
449     rrd_t     rrd;
450     time_t    current_time = 0;
451     unsigned long current_time_usec = 0;    /* microseconds part of current time */
452     char    **updvals;
453     int       schedule_smooth = 0;
454
455     /* number of elapsed PDP steps since last update */
456     unsigned long *rra_step_cnt = NULL;
457
458     int       version;  /* rrd version */
459     rrd_file_t *rrd_file;
460     char     *arg_copy; /* for processing the argv */
461     unsigned long *skip_update; /* RRAs to advance but not write */
462
463     /* need at least 1 arguments: data. */
464     if (argc < 1) {
465         rrd_set_error("Not enough arguments");
466         goto err_out;
467     }
468
469     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
470         goto err_free;
471     }
472     /* We are now at the beginning of the rra's */
473     rra_current = rra_begin = rrd_file->header_len;
474
475     version = atoi(rrd.stat_head->version);
476
477     initialize_time(&current_time, &current_time_usec, version);
478
479     /* get exclusive lock to whole file.
480      * lock gets removed when we close the file.
481      */
482     if (rrd_lock(rrd_file) != 0) {
483         rrd_set_error("could not lock RRD");
484         goto err_close;
485     }
486
487     if (allocate_data_structures(&rrd, &updvals,
488                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
489                                  &rra_step_cnt, &skip_update,
490                                  &pdp_new) == -1) {
491         goto err_close;
492     }
493
494     /* loop through the arguments. */
495     for (arg_i = 0; arg_i < argc; arg_i++) {
496         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
497             rrd_set_error("failed duplication argv entry");
498             break;
499         }
500         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
501                         &current_time, &current_time_usec, pdp_temp, pdp_new,
502                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
503                         &pcdp_summary, version, skip_update,
504                         &schedule_smooth) == -1) {
505             free(arg_copy);
506             break;
507         }
508         free(arg_copy);
509     }
510
511     free(rra_step_cnt);
512
513     /* if we got here and if there is an error and if the file has not been
514      * written to, then close things up and return. */
515     if (rrd_test_error()) {
516         goto err_free_structures;
517     }
518 #ifndef HAVE_MMAP
519     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
520         goto err_free_structures;
521     }
522 #endif
523
524     /* calling the smoothing code here guarantees at most one smoothing
525      * operation per rrd_update call. Unfortunately, it is possible with bulk
526      * updates, or a long-delayed update for smoothing to occur off-schedule.
527      * This really isn't critical except during the burn-in cycles. */
528     if (schedule_smooth) {
529         smooth_all_rras(&rrd, rrd_file, rra_begin);
530     }
531
532 /*    rrd_dontneed(rrd_file,&rrd); */
533     rrd_free(&rrd);
534     rrd_close(rrd_file);
535
536     free(pdp_new);
537     free(tmpl_idx);
538     free(pdp_temp);
539     free(skip_update);
540     free(updvals);
541     return 0;
542
543   err_free_structures:
544     free(pdp_new);
545     free(tmpl_idx);
546     free(pdp_temp);
547     free(skip_update);
548     free(updvals);
549   err_close:
550     rrd_close(rrd_file);
551   err_free:
552     rrd_free(&rrd);
553   err_out:
554     return -1;
555 }
556
557 /*
558  * get exclusive lock to whole file.
559  * lock gets removed when we close the file
560  *
561  * returns 0 on success
562  */
563 int rrd_lock(
564     rrd_file_t *file)
565 {
566     int       rcstat;
567
568     {
569 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
570         struct _stat st;
571
572         if (_fstat(file->fd, &st) == 0) {
573             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
574         } else {
575             rcstat = -1;
576         }
577 #else
578         struct flock lock;
579
580         lock.l_type = F_WRLCK;  /* exclusive write lock */
581         lock.l_len = 0; /* whole file */
582         lock.l_start = 0;   /* start of file */
583         lock.l_whence = SEEK_SET;   /* end of file */
584
585         rcstat = fcntl(file->fd, F_SETLK, &lock);
586 #endif
587     }
588
589     return (rcstat);
590 }
591
592 /*
593  * Allocate some important arrays used, and initialize the template.
594  *
595  * When it returns, either all of the structures are allocated
596  * or none of them are.
597  *
598  * Returns 0 on success, -1 on error.
599  */
600 static int allocate_data_structures(
601     rrd_t *rrd,
602     char ***updvals,
603     rrd_value_t **pdp_temp,
604     const char *tmplt,
605     long **tmpl_idx,
606     unsigned long *tmpl_cnt,
607     unsigned long **rra_step_cnt,
608     unsigned long **skip_update,
609     rrd_value_t **pdp_new)
610 {
611     unsigned  i, ii;
612     if ((*updvals = (char **) malloc(sizeof(char *)
613                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
614         rrd_set_error("allocating updvals pointer array.");
615         return -1;
616     }
617     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
618                                             * rrd->stat_head->ds_cnt)) ==
619         NULL) {
620         rrd_set_error("allocating pdp_temp.");
621         goto err_free_updvals;
622     }
623     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
624                                                  *
625                                                  rrd->stat_head->rra_cnt)) ==
626         NULL) {
627         rrd_set_error("allocating skip_update.");
628         goto err_free_pdp_temp;
629     }
630     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
631                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
632         rrd_set_error("allocating tmpl_idx.");
633         goto err_free_skip_update;
634     }
635     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
636                                                   *
637                                                   (rrd->stat_head->
638                                                    rra_cnt))) == NULL) {
639         rrd_set_error("allocating rra_step_cnt.");
640         goto err_free_tmpl_idx;
641     }
642
643     /* initialize tmplt redirector */
644     /* default config example (assume DS 1 is a CDEF DS)
645        tmpl_idx[0] -> 0; (time)
646        tmpl_idx[1] -> 1; (DS 0)
647        tmpl_idx[2] -> 3; (DS 2)
648        tmpl_idx[3] -> 4; (DS 3) */
649     (*tmpl_idx)[0] = 0; /* time */
650     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
651         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
652             (*tmpl_idx)[ii++] = i;
653     }
654     *tmpl_cnt = ii;
655
656     if (tmplt != NULL) {
657         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
658             goto err_free_rra_step_cnt;
659         }
660     }
661
662     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
663                                            * rrd->stat_head->ds_cnt)) == NULL) {
664         rrd_set_error("allocating pdp_new.");
665         goto err_free_rra_step_cnt;
666     }
667
668     return 0;
669
670   err_free_rra_step_cnt:
671     free(*rra_step_cnt);
672   err_free_tmpl_idx:
673     free(*tmpl_idx);
674   err_free_skip_update:
675     free(*skip_update);
676   err_free_pdp_temp:
677     free(*pdp_temp);
678   err_free_updvals:
679     free(*updvals);
680     return -1;
681 }
682
683 /*
684  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
685  *
686  * Returns 0 on success.
687  */
688 static int parse_template(
689     rrd_t *rrd,
690     const char *tmplt,
691     unsigned long *tmpl_cnt,
692     long *tmpl_idx)
693 {
694     char     *dsname, *tmplt_copy;
695     unsigned int tmpl_len, i;
696     int       ret = 0;
697
698     *tmpl_cnt = 1;      /* the first entry is the time */
699
700     /* we should work on a writeable copy here */
701     if ((tmplt_copy = strdup(tmplt)) == NULL) {
702         rrd_set_error("error copying tmplt '%s'", tmplt);
703         ret = -1;
704         goto out;
705     }
706
707     dsname = tmplt_copy;
708     tmpl_len = strlen(tmplt_copy);
709     for (i = 0; i <= tmpl_len; i++) {
710         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
711             tmplt_copy[i] = '\0';
712             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
713                 rrd_set_error("tmplt contains more DS definitions than RRD");
714                 ret = -1;
715                 goto out_free_tmpl_copy;
716             }
717             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
718                 rrd_set_error("unknown DS name '%s'", dsname);
719                 ret = -1;
720                 goto out_free_tmpl_copy;
721             }
722             /* go to the next entry on the tmplt_copy */
723             if (i < tmpl_len)
724                 dsname = &tmplt_copy[i + 1];
725         }
726     }
727   out_free_tmpl_copy:
728     free(tmplt_copy);
729   out:
730     return ret;
731 }
732
733 /*
734  * Parse an update string, updates the primary data points (PDPs)
735  * and consolidated data points (CDPs), and writes changes to the RRAs.
736  *
737  * Returns 0 on success, -1 on error.
738  */
739 static int process_arg(
740     char *step_start,
741     rrd_t *rrd,
742     rrd_file_t *rrd_file,
743     unsigned long rra_begin,
744     unsigned long *rra_current,
745     time_t *current_time,
746     unsigned long *current_time_usec,
747     rrd_value_t *pdp_temp,
748     rrd_value_t *pdp_new,
749     unsigned long *rra_step_cnt,
750     char **updvals,
751     long *tmpl_idx,
752     unsigned long tmpl_cnt,
753     rrd_info_t ** pcdp_summary,
754     int version,
755     unsigned long *skip_update,
756     int *schedule_smooth)
757 {
758     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
759
760     /* a vector of future Holt-Winters seasonal coefs */
761     unsigned long elapsed_pdp_st;
762
763     double    interval, pre_int, post_int;  /* interval between this and
764                                              * the last run */
765     unsigned long proc_pdp_cnt;
766     unsigned long rra_start;
767
768     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
769                  current_time, current_time_usec, version) == -1) {
770         return -1;
771     }
772     /* seek to the beginning of the rra's */
773     if (*rra_current != rra_begin) {
774 #ifndef HAVE_MMAP
775         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
776             rrd_set_error("seek error in rrd");
777             return -1;
778         }
779 #endif
780         *rra_current = rra_begin;
781     }
782     rra_start = rra_begin;
783
784     interval = (double) (*current_time - rrd->live_head->last_up)
785         + (double) ((long) *current_time_usec -
786                     (long) rrd->live_head->last_up_usec) / 1e6f;
787
788     /* process the data sources and update the pdp_prep 
789      * area accordingly */
790     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
791         return -1;
792     }
793
794     elapsed_pdp_st = calculate_elapsed_steps(rrd,
795                                              *current_time,
796                                              *current_time_usec, interval,
797                                              &pre_int, &post_int,
798                                              &proc_pdp_cnt);
799
800     /* has a pdp_st moment occurred since the last run ? */
801     if (elapsed_pdp_st == 0) {
802         /* no we have not passed a pdp_st moment. therefore update is simple */
803         simple_update(rrd, interval, pdp_new);
804     } else {
805         /* an pdp_st has occurred. */
806         if (process_all_pdp_st(rrd, interval,
807                                pre_int, post_int,
808                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
809             return -1;
810         }
811         if (update_all_cdp_prep(rrd, rra_step_cnt,
812                                 rra_begin, rrd_file,
813                                 elapsed_pdp_st,
814                                 proc_pdp_cnt,
815                                 &last_seasonal_coef,
816                                 &seasonal_coef,
817                                 pdp_temp, rra_current,
818                                 skip_update, schedule_smooth) == -1) {
819             goto err_free_coefficients;
820         }
821         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
822                                  elapsed_pdp_st, pdp_temp,
823                                  &seasonal_coef) == -1) {
824             goto err_free_coefficients;
825         }
826         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
827                           rra_current, *current_time, skip_update,
828                           pcdp_summary) == -1) {
829             goto err_free_coefficients;
830         }
831     }                   /* endif a pdp_st has occurred */
832     rrd->live_head->last_up = *current_time;
833     rrd->live_head->last_up_usec = *current_time_usec;
834
835     if (version < 3) {
836         *rrd->legacy_last_up = rrd->live_head->last_up;
837     }
838     free(seasonal_coef);
839     free(last_seasonal_coef);
840     return 0;
841
842   err_free_coefficients:
843     free(seasonal_coef);
844     free(last_seasonal_coef);
845     return -1;
846 }
847
848 /*
849  * Parse a DS string (time + colon-separated values), storing the
850  * results in current_time, current_time_usec, and updvals.
851  *
852  * Returns 0 on success, -1 on error.
853  */
854 static int parse_ds(
855     rrd_t *rrd,
856     char **updvals,
857     long *tmpl_idx,
858     char *input,
859     unsigned long tmpl_cnt,
860     time_t *current_time,
861     unsigned long *current_time_usec,
862     int version)
863 {
864     char     *p;
865     unsigned long i;
866     char      timesyntax;
867
868     updvals[0] = input;
869     /* initialize all ds input to unknown except the first one
870        which has always got to be set */
871     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
872         updvals[i] = "U";
873
874     /* separate all ds elements; first must be examined separately
875        due to alternate time syntax */
876     if ((p = strchr(input, '@')) != NULL) {
877         timesyntax = '@';
878     } else if ((p = strchr(input, ':')) != NULL) {
879         timesyntax = ':';
880     } else {
881         rrd_set_error("expected timestamp not found in data source from %s",
882                       input);
883         return -1;
884     }
885     *p = '\0';
886     i = 1;
887     updvals[tmpl_idx[i++]] = p + 1;
888     while (*(++p)) {
889         if (*p == ':') {
890             *p = '\0';
891             if (i < tmpl_cnt) {
892                 updvals[tmpl_idx[i++]] = p + 1;
893             }
894         }
895     }
896
897     if (i != tmpl_cnt) {
898         rrd_set_error("expected %lu data source readings (got %lu) from %s",
899                       tmpl_cnt - 1, i, input);
900         return -1;
901     }
902
903     if (get_time_from_reading(rrd, timesyntax, updvals,
904                               current_time, current_time_usec,
905                               version) == -1) {
906         return -1;
907     }
908     return 0;
909 }
910
911 /*
912  * Parse the time in a DS string, store it in current_time and 
913  * current_time_usec and verify that it's later than the last
914  * update for this DS.
915  *
916  * Returns 0 on success, -1 on error.
917  */
918 static int get_time_from_reading(
919     rrd_t *rrd,
920     char timesyntax,
921     char **updvals,
922     time_t *current_time,
923     unsigned long *current_time_usec,
924     int version)
925 {
926     double    tmp;
927     char     *parsetime_error = NULL;
928     char     *old_locale;
929     rrd_time_value_t ds_tv;
930     struct timeval tmp_time;    /* used for time conversion */
931
932     /* get the time from the reading ... handle N */
933     if (timesyntax == '@') {    /* at-style */
934         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
935             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
936             return -1;
937         }
938         if (ds_tv.type == RELATIVE_TO_END_TIME ||
939             ds_tv.type == RELATIVE_TO_START_TIME) {
940             rrd_set_error("specifying time relative to the 'start' "
941                           "or 'end' makes no sense here: %s", updvals[0]);
942             return -1;
943         }
944         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
945         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
946     } else if (strcmp(updvals[0], "N") == 0) {
947         gettimeofday(&tmp_time, 0);
948         normalize_time(&tmp_time);
949         *current_time = tmp_time.tv_sec;
950         *current_time_usec = tmp_time.tv_usec;
951     } else {
952         old_locale = setlocale(LC_NUMERIC, "C");
953         tmp = strtod(updvals[0], 0);
954         setlocale(LC_NUMERIC, old_locale);
955         *current_time = floor(tmp);
956         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
957     }
958     /* dont do any correction for old version RRDs */
959     if (version < 3)
960         *current_time_usec = 0;
961
962     if (*current_time < rrd->live_head->last_up ||
963         (*current_time == rrd->live_head->last_up &&
964          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
965         rrd_set_error("illegal attempt to update using time %ld when "
966                       "last update time is %ld (minimum one second step)",
967                       *current_time, rrd->live_head->last_up);
968         return -1;
969     }
970     return 0;
971 }
972
973 /*
974  * Update pdp_new by interpreting the updvals according to the DS type
975  * (COUNTER, GAUGE, etc.).
976  *
977  * Returns 0 on success, -1 on error.
978  */
979 static int update_pdp_prep(
980     rrd_t *rrd,
981     char **updvals,
982     rrd_value_t *pdp_new,
983     double interval)
984 {
985     unsigned long ds_idx;
986     int       ii;
987     char     *endptr;   /* used in the conversion */
988     double    rate;
989     char     *old_locale;
990     enum dst_en dst_idx;
991
992     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
993         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
994
995         /* make sure we do not build diffs with old last_ds values */
996         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
997             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
998             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
999         }
1000
1001         /* NOTE: DST_CDEF should never enter this if block, because
1002          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1003          * accidently specified a value for the DST_CDEF. To handle this case,
1004          * an extra check is required. */
1005
1006         if ((updvals[ds_idx + 1][0] != 'U') &&
1007             (dst_idx != DST_CDEF) &&
1008             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1009             rate = DNAN;
1010
1011             /* pdp_new contains rate * time ... eg the bytes transferred during
1012              * the interval. Doing it this way saves a lot of math operations
1013              */
1014             switch (dst_idx) {
1015             case DST_COUNTER:
1016             case DST_DERIVE:
1017                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1018                     if ((updvals[ds_idx + 1][ii] < '0'
1019                          || updvals[ds_idx + 1][ii] > '9')
1020                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1021                         rrd_set_error("not a simple integer: '%s'",
1022                                       updvals[ds_idx + 1]);
1023                         return -1;
1024                     }
1025                 }
1026                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1027                     pdp_new[ds_idx] =
1028                         rrd_diff(updvals[ds_idx + 1],
1029                                  rrd->pdp_prep[ds_idx].last_ds);
1030                     if (dst_idx == DST_COUNTER) {
1031                         /* simple overflow catcher. This will fail
1032                          * terribly for non 32 or 64 bit counters
1033                          * ... are there any others in SNMP land?
1034                          */
1035                         if (pdp_new[ds_idx] < (double) 0.0)
1036                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1037                         if (pdp_new[ds_idx] < (double) 0.0)
1038                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1039                     }
1040                     rate = pdp_new[ds_idx] / interval;
1041                 } else {
1042                     pdp_new[ds_idx] = DNAN;
1043                 }
1044                 break;
1045             case DST_ABSOLUTE:
1046                 old_locale = setlocale(LC_NUMERIC, "C");
1047                 errno = 0;
1048                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1049                 setlocale(LC_NUMERIC, old_locale);
1050                 if (errno > 0) {
1051                     rrd_set_error("converting '%s' to float: %s",
1052                                   updvals[ds_idx + 1], rrd_strerror(errno));
1053                     return -1;
1054                 };
1055                 if (endptr[0] != '\0') {
1056                     rrd_set_error
1057                         ("conversion of '%s' to float not complete: tail '%s'",
1058                          updvals[ds_idx + 1], endptr);
1059                     return -1;
1060                 }
1061                 rate = pdp_new[ds_idx] / interval;
1062                 break;
1063             case DST_GAUGE:
1064                 errno = 0;
1065                 old_locale = setlocale(LC_NUMERIC, "C");
1066                 pdp_new[ds_idx] =
1067                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1068                 setlocale(LC_NUMERIC, old_locale);
1069                 if (errno) {
1070                     rrd_set_error("converting '%s' to float: %s",
1071                                   updvals[ds_idx + 1], rrd_strerror(errno));
1072                     return -1;
1073                 };
1074                 if (endptr[0] != '\0') {
1075                     rrd_set_error
1076                         ("conversion of '%s' to float not complete: tail '%s'",
1077                          updvals[ds_idx + 1], endptr);
1078                     return -1;
1079                 }
1080                 rate = pdp_new[ds_idx] / interval;
1081                 break;
1082             default:
1083                 rrd_set_error("rrd contains unknown DS type : '%s'",
1084                               rrd->ds_def[ds_idx].dst);
1085                 return -1;
1086             }
1087             /* break out of this for loop if the error string is set */
1088             if (rrd_test_error()) {
1089                 return -1;
1090             }
1091             /* make sure pdp_temp is neither too large or too small
1092              * if any of these occur it becomes unknown ...
1093              * sorry folks ... */
1094             if (!isnan(rate) &&
1095                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1096                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1097                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1098                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1099                 pdp_new[ds_idx] = DNAN;
1100             }
1101         } else {
1102             /* no news is news all the same */
1103             pdp_new[ds_idx] = DNAN;
1104         }
1105
1106
1107         /* make a copy of the command line argument for the next run */
1108 #ifdef DEBUG
1109         fprintf(stderr, "prep ds[%lu]\t"
1110                 "last_arg '%s'\t"
1111                 "this_arg '%s'\t"
1112                 "pdp_new %10.2f\n",
1113                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1114                 pdp_new[ds_idx]);
1115 #endif
1116         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1117                 LAST_DS_LEN - 1);
1118         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1119     }
1120     return 0;
1121 }
1122
1123 /*
1124  * How many PDP steps have elapsed since the last update? Returns the answer,
1125  * and stores the time between the last update and the last PDP in pre_time,
1126  * and the time between the last PDP and the current time in post_int.
1127  */
1128 static int calculate_elapsed_steps(
1129     rrd_t *rrd,
1130     unsigned long current_time,
1131     unsigned long current_time_usec,
1132     double interval,
1133     double *pre_int,
1134     double *post_int,
1135     unsigned long *proc_pdp_cnt)
1136 {
1137     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1138     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1139                                  * time */
1140     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1141                                  * when it was last updated */
1142     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1143
1144     /* when was the current pdp started */
1145     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1146     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1147
1148     /* when did the last pdp_st occur */
1149     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1150     occu_pdp_st = current_time - occu_pdp_age;
1151
1152     if (occu_pdp_st > proc_pdp_st) {
1153         /* OK we passed the pdp_st moment */
1154         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1155                                                                      * occurred before the latest
1156                                                                      * pdp_st moment*/
1157         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1158         *post_int = occu_pdp_age;   /* how much after it */
1159         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1160     } else {
1161         *pre_int = interval;
1162         *post_int = 0;
1163     }
1164
1165     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1166
1167 #ifdef DEBUG
1168     printf("proc_pdp_age %lu\t"
1169            "proc_pdp_st %lu\t"
1170            "occu_pfp_age %lu\t"
1171            "occu_pdp_st %lu\t"
1172            "int %lf\t"
1173            "pre_int %lf\t"
1174            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1175            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1176 #endif
1177
1178     /* compute the number of elapsed pdp_st moments */
1179     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1180 }
1181
1182 /*
1183  * Increment the PDP values by the values in pdp_new, or else initialize them.
1184  */
1185 static void simple_update(
1186     rrd_t *rrd,
1187     double interval,
1188     rrd_value_t *pdp_new)
1189 {
1190     int       i;
1191
1192     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1193         if (isnan(pdp_new[i])) {
1194             /* this is not really accurate if we use subsecond data arrival time
1195                should have thought of it when going subsecond resolution ...
1196                sorry next format change we will have it! */
1197             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1198                 floor(interval);
1199         } else {
1200             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1201                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1202             } else {
1203                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1204             }
1205         }
1206 #ifdef DEBUG
1207         fprintf(stderr,
1208                 "NO PDP  ds[%i]\t"
1209                 "value %10.2f\t"
1210                 "unkn_sec %5lu\n",
1211                 i,
1212                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1213                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1214 #endif
1215     }
1216 }
1217
1218 /*
1219  * Call process_pdp_st for each DS.
1220  *
1221  * Returns 0 on success, -1 on error.
1222  */
1223 static int process_all_pdp_st(
1224     rrd_t *rrd,
1225     double interval,
1226     double pre_int,
1227     double post_int,
1228     unsigned long elapsed_pdp_st,
1229     rrd_value_t *pdp_new,
1230     rrd_value_t *pdp_temp)
1231 {
1232     unsigned long ds_idx;
1233
1234     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1235        rate*seconds which occurred up to the last run.
1236        pdp_new[] contains rate*seconds from the latest run.
1237        pdp_temp[] will contain the rate for cdp */
1238
1239     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1240         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1241                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1242                            pdp_new, pdp_temp) == -1) {
1243             return -1;
1244         }
1245 #ifdef DEBUG
1246         fprintf(stderr, "PDP UPD ds[%lu]\t"
1247                 "elapsed_pdp_st %lu\t"
1248                 "pdp_temp %10.2f\t"
1249                 "new_prep %10.2f\t"
1250                 "new_unkn_sec %5lu\n",
1251                 ds_idx,
1252                 elapsed_pdp_st,
1253                 pdp_temp[ds_idx],
1254                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1255                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1256 #endif
1257     }
1258     return 0;
1259 }
1260
1261 /*
1262  * Process an update that occurs after one of the PDP moments.
1263  * Increments the PDP value, sets NAN if time greater than the
1264  * heartbeats have elapsed, processes CDEFs.
1265  *
1266  * Returns 0 on success, -1 on error.
1267  */
1268 static int process_pdp_st(
1269     rrd_t *rrd,
1270     unsigned long ds_idx,
1271     double interval,
1272     double pre_int,
1273     double post_int,
1274     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1275     rrd_value_t *pdp_new,
1276     rrd_value_t *pdp_temp)
1277 {
1278     int       i;
1279
1280     /* update pdp_prep to the current pdp_st. */
1281     double    pre_unknown = 0.0;
1282     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1283     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1284
1285     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1286
1287     rpnstack_init(&rpnstack);
1288
1289
1290     if (isnan(pdp_new[ds_idx])) {
1291         /* a final bit of unknown to be added before calculation
1292            we use a temporary variable for this so that we
1293            don't have to turn integer lines before using the value */
1294         pre_unknown = pre_int;
1295     } else {
1296         if (isnan(scratch[PDP_val].u_val)) {
1297             scratch[PDP_val].u_val = 0;
1298         }
1299         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1300     }
1301
1302     /* if too much of the pdp_prep is unknown we dump it */
1303     /* if the interval is larger thatn mrhb we get NAN */
1304     if ((interval > mrhb) ||
1305         (rrd->stat_head->pdp_step / 2.0 <
1306          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1307         pdp_temp[ds_idx] = DNAN;
1308     } else {
1309         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1310             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1311              pre_unknown);
1312     }
1313
1314     /* process CDEF data sources; remember each CDEF DS can
1315      * only reference other DS with a lower index number */
1316     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1317         rpnp_t   *rpnp;
1318
1319         rpnp =
1320             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1321         /* substitute data values for OP_VARIABLE nodes */
1322         for (i = 0; rpnp[i].op != OP_END; i++) {
1323             if (rpnp[i].op == OP_VARIABLE) {
1324                 rpnp[i].op = OP_NUMBER;
1325                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1326             }
1327         }
1328         /* run the rpn calculator */
1329         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1330             free(rpnp);
1331             rpnstack_free(&rpnstack);
1332             return -1;
1333         }
1334     }
1335
1336     /* make pdp_prep ready for the next run */
1337     if (isnan(pdp_new[ds_idx])) {
1338         /* this is not realy accurate if we use subsecond data arival time
1339            should have thought of it when going subsecond resolution ...
1340            sorry next format change we will have it! */
1341         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1342         scratch[PDP_val].u_val = DNAN;
1343     } else {
1344         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1345         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1346     }
1347     rpnstack_free(&rpnstack);
1348     return 0;
1349 }
1350
1351 /*
1352  * Iterate over all the RRAs for a given DS and:
1353  * 1. Decide whether to schedule a smooth later
1354  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1355  * 3. Update the CDP
1356  *
1357  * Returns 0 on success, -1 on error
1358  */
1359 static int update_all_cdp_prep(
1360     rrd_t *rrd,
1361     unsigned long *rra_step_cnt,
1362     unsigned long rra_begin,
1363     rrd_file_t *rrd_file,
1364     unsigned long elapsed_pdp_st,
1365     unsigned long proc_pdp_cnt,
1366     rrd_value_t **last_seasonal_coef,
1367     rrd_value_t **seasonal_coef,
1368     rrd_value_t *pdp_temp,
1369     unsigned long *rra_current,
1370     unsigned long *skip_update,
1371     int *schedule_smooth)
1372 {
1373     unsigned long rra_idx;
1374
1375     /* index into the CDP scratch array */
1376     enum cf_en current_cf;
1377     unsigned long rra_start;
1378
1379     /* number of rows to be updated in an RRA for a data value. */
1380     unsigned long start_pdp_offset;
1381
1382     rra_start = rra_begin;
1383     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1384         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1385         start_pdp_offset =
1386             rrd->rra_def[rra_idx].pdp_cnt -
1387             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1388         skip_update[rra_idx] = 0;
1389         if (start_pdp_offset <= elapsed_pdp_st) {
1390             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1391                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1392         } else {
1393             rra_step_cnt[rra_idx] = 0;
1394         }
1395
1396         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1397             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1398              * so that they will be correct for the next observed value; note that for
1399              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1400              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1401             if (rra_step_cnt[rra_idx] > 1) {
1402                 skip_update[rra_idx] = 1;
1403                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1404                                 elapsed_pdp_st, last_seasonal_coef);
1405                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1406                                 elapsed_pdp_st + 1, seasonal_coef);
1407             }
1408             /* periodically run a smoother for seasonal effects */
1409             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1410 #ifdef DEBUG
1411                 fprintf(stderr,
1412                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1413                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1414                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1415                         u_cnt);
1416 #endif
1417                 *schedule_smooth = 1;
1418             }
1419             *rra_current = rrd_tell(rrd_file);
1420         }
1421         if (rrd_test_error())
1422             return -1;
1423
1424         if (update_cdp_prep
1425             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1426              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1427              current_cf) == -1) {
1428             return -1;
1429         }
1430         rra_start +=
1431             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1432             sizeof(rrd_value_t);
1433     }
1434     return 0;
1435 }
1436
1437 /* 
1438  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1439  */
1440 static int do_schedule_smooth(
1441     rrd_t *rrd,
1442     unsigned long rra_idx,
1443     unsigned long elapsed_pdp_st)
1444 {
1445     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1446     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1447     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1448     unsigned long seasonal_smooth_idx =
1449         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1450     unsigned long *init_seasonal =
1451         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1452
1453     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1454      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1455      * really an RRA level, not a data source within RRA level parameter, but
1456      * the rra_def is read only for rrd_update (not flushed to disk). */
1457     if (*init_seasonal > BURNIN_CYCLES) {
1458         /* someone has no doubt invented a trick to deal with this wrap around,
1459          * but at least this code is clear. */
1460         if (seasonal_smooth_idx > cur_row) {
1461             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1462              * between PDP and CDP */
1463             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1464         }
1465         /* can't rely on negative numbers because we are working with
1466          * unsigned values */
1467         return (cur_row + elapsed_pdp_st >= row_cnt
1468                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1469     }
1470     /* mark off one of the burn-in cycles */
1471     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1472 }
1473
1474 /*
1475  * For a given RRA, iterate over the data sources and call the appropriate
1476  * consolidation function.
1477  *
1478  * Returns 0 on success, -1 on error.
1479  */
1480 static int update_cdp_prep(
1481     rrd_t *rrd,
1482     unsigned long elapsed_pdp_st,
1483     unsigned long start_pdp_offset,
1484     unsigned long *rra_step_cnt,
1485     int rra_idx,
1486     rrd_value_t *pdp_temp,
1487     rrd_value_t *last_seasonal_coef,
1488     rrd_value_t *seasonal_coef,
1489     int current_cf)
1490 {
1491     unsigned long ds_idx, cdp_idx;
1492
1493     /* update CDP_PREP areas */
1494     /* loop over data soures within each RRA */
1495     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1496
1497         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1498
1499         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1500             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1501                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1502                        elapsed_pdp_st, start_pdp_offset,
1503                        rrd->rra_def[rra_idx].pdp_cnt,
1504                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1505                        rra_idx, ds_idx);
1506         } else {
1507             /* Nothing to consolidate if there's one PDP per CDP. However, if
1508              * we've missed some PDPs, let's update null counters etc. */
1509             if (elapsed_pdp_st > 2) {
1510                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1511                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1512                           current_cf);
1513             }
1514         }
1515
1516         if (rrd_test_error())
1517             return -1;
1518     }                   /* endif data sources loop */
1519     return 0;
1520 }
1521
1522 /*
1523  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1524  * primary value, secondary value, and # of unknowns.
1525  */
1526 static void update_cdp(
1527     unival *scratch,
1528     int current_cf,
1529     rrd_value_t pdp_temp_val,
1530     unsigned long rra_step_cnt,
1531     unsigned long elapsed_pdp_st,
1532     unsigned long start_pdp_offset,
1533     unsigned long pdp_cnt,
1534     rrd_value_t xff,
1535     int i,
1536     int ii)
1537 {
1538     /* shorthand variables */
1539     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1540     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1541     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1542     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1543
1544     if (rra_step_cnt) {
1545         /* If we are in this block, as least 1 CDP value will be written to
1546          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1547          * to be written, then the "fill in" value is the CDP_secondary_val
1548          * entry. */
1549         if (isnan(pdp_temp_val)) {
1550             *cdp_unkn_pdp_cnt += start_pdp_offset;
1551             *cdp_secondary_val = DNAN;
1552         } else {
1553             /* CDP_secondary value is the RRA "fill in" value for intermediary
1554              * CDP data entries. No matter the CF, the value is the same because
1555              * the average, max, min, and last of a list of identical values is
1556              * the same, namely, the value itself. */
1557             *cdp_secondary_val = pdp_temp_val;
1558         }
1559
1560         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1561             *cdp_primary_val = DNAN;
1562             if (current_cf == CF_AVERAGE) {
1563                 *cdp_val =
1564                     initialize_average_carry_over(pdp_temp_val,
1565                                                   elapsed_pdp_st,
1566                                                   start_pdp_offset, pdp_cnt);
1567             } else {
1568                 *cdp_val = pdp_temp_val;
1569             }
1570         } else {
1571             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1572                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1573         }               /* endif meets xff value requirement for a valid value */
1574         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1575          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1576         if (isnan(pdp_temp_val))
1577             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1578         else
1579             *cdp_unkn_pdp_cnt = 0;
1580     } else {            /* rra_step_cnt[i]  == 0 */
1581
1582 #ifdef DEBUG
1583         if (isnan(*cdp_val)) {
1584             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1585                     i, ii);
1586         } else {
1587             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1588                     i, ii, *cdp_val);
1589         }
1590 #endif
1591         if (isnan(pdp_temp_val)) {
1592             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1593         } else {
1594             *cdp_val =
1595                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1596                                   current_cf, i, ii);
1597         }
1598     }
1599 }
1600
1601 /*
1602  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1603  * on the type of consolidation function.
1604  */
1605 static void initialize_cdp_val(
1606     unival *scratch,
1607     int current_cf,
1608     rrd_value_t pdp_temp_val,
1609     unsigned long elapsed_pdp_st,
1610     unsigned long start_pdp_offset,
1611     unsigned long pdp_cnt)
1612 {
1613     rrd_value_t cum_val, cur_val;
1614
1615     switch (current_cf) {
1616     case CF_AVERAGE:
1617         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1618         cur_val = IFDNAN(pdp_temp_val, 0.0);
1619         scratch[CDP_primary_val].u_val =
1620             (cum_val + cur_val * start_pdp_offset) /
1621             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1622         scratch[CDP_val].u_val =
1623             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1624                                           start_pdp_offset, pdp_cnt);
1625         break;
1626     case CF_MAXIMUM:
1627         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1628         cur_val = IFDNAN(pdp_temp_val, -DINF);
1629 #if 0
1630 #ifdef DEBUG
1631         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1632             fprintf(stderr,
1633                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1634                     i, ii);
1635             exit(-1);
1636         }
1637 #endif
1638 #endif
1639         if (cur_val > cum_val)
1640             scratch[CDP_primary_val].u_val = cur_val;
1641         else
1642             scratch[CDP_primary_val].u_val = cum_val;
1643         /* initialize carry over value */
1644         scratch[CDP_val].u_val = pdp_temp_val;
1645         break;
1646     case CF_MINIMUM:
1647         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1648         cur_val = IFDNAN(pdp_temp_val, DINF);
1649 #if 0
1650 #ifdef DEBUG
1651         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1652             fprintf(stderr,
1653                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1654                     ii);
1655             exit(-1);
1656         }
1657 #endif
1658 #endif
1659         if (cur_val < cum_val)
1660             scratch[CDP_primary_val].u_val = cur_val;
1661         else
1662             scratch[CDP_primary_val].u_val = cum_val;
1663         /* initialize carry over value */
1664         scratch[CDP_val].u_val = pdp_temp_val;
1665         break;
1666     case CF_LAST:
1667     default:
1668         scratch[CDP_primary_val].u_val = pdp_temp_val;
1669         /* initialize carry over value */
1670         scratch[CDP_val].u_val = pdp_temp_val;
1671         break;
1672     }
1673 }
1674
1675 /*
1676  * Update the consolidation function for Holt-Winters functions as
1677  * well as other functions that don't actually consolidate multiple
1678  * PDPs.
1679  */
1680 static void reset_cdp(
1681     rrd_t *rrd,
1682     unsigned long elapsed_pdp_st,
1683     rrd_value_t *pdp_temp,
1684     rrd_value_t *last_seasonal_coef,
1685     rrd_value_t *seasonal_coef,
1686     int rra_idx,
1687     int ds_idx,
1688     int cdp_idx,
1689     enum cf_en current_cf)
1690 {
1691     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1692
1693     switch (current_cf) {
1694     case CF_AVERAGE:
1695     default:
1696         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1697         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1698         break;
1699     case CF_SEASONAL:
1700     case CF_DEVSEASONAL:
1701         /* need to update cached seasonal values, so they are consistent
1702          * with the bulk update */
1703         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1704          * CDP_last_deviation are the same. */
1705         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1706         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1707         break;
1708     case CF_HWPREDICT:
1709     case CF_MHWPREDICT:
1710         /* need to update the null_count and last_null_count.
1711          * even do this for non-DNAN pdp_temp because the
1712          * algorithm is not learning from batch updates. */
1713         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1714         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1715         /* fall through */
1716     case CF_DEVPREDICT:
1717         scratch[CDP_primary_val].u_val = DNAN;
1718         scratch[CDP_secondary_val].u_val = DNAN;
1719         break;
1720     case CF_FAILURES:
1721         /* do not count missed bulk values as failures */
1722         scratch[CDP_primary_val].u_val = 0;
1723         scratch[CDP_secondary_val].u_val = 0;
1724         /* need to reset violations buffer.
1725          * could do this more carefully, but for now, just
1726          * assume a bulk update wipes away all violations. */
1727         erase_violations(rrd, cdp_idx, rra_idx);
1728         break;
1729     }
1730 }
1731
1732 static rrd_value_t initialize_average_carry_over(
1733     rrd_value_t pdp_temp_val,
1734     unsigned long elapsed_pdp_st,
1735     unsigned long start_pdp_offset,
1736     unsigned long pdp_cnt)
1737 {
1738     /* initialize carry over value */
1739     if (isnan(pdp_temp_val)) {
1740         return DNAN;
1741     }
1742     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1743 }
1744
1745 /*
1746  * Update or initialize a CDP value based on the consolidation
1747  * function.
1748  *
1749  * Returns the new value.
1750  */
1751 static rrd_value_t calculate_cdp_val(
1752     rrd_value_t cdp_val,
1753     rrd_value_t pdp_temp_val,
1754     unsigned long elapsed_pdp_st,
1755     int current_cf,
1756 #ifdef DEBUG
1757     int i,
1758     int ii
1759 #else
1760     int UNUSED(i),
1761     int UNUSED(ii)
1762 #endif
1763     )
1764 {
1765     if (isnan(cdp_val)) {
1766         if (current_cf == CF_AVERAGE) {
1767             pdp_temp_val *= elapsed_pdp_st;
1768         }
1769 #ifdef DEBUG
1770         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1771                 i, ii, pdp_temp_val);
1772 #endif
1773         return pdp_temp_val;
1774     }
1775     if (current_cf == CF_AVERAGE)
1776         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1777     if (current_cf == CF_MINIMUM)
1778         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1779     if (current_cf == CF_MAXIMUM)
1780         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1781
1782     return pdp_temp_val;
1783 }
1784
1785 /*
1786  * For each RRA, update the seasonal values and then call update_aberrant_CF
1787  * for each data source.
1788  *
1789  * Return 0 on success, -1 on error.
1790  */
1791 static int update_aberrant_cdps(
1792     rrd_t *rrd,
1793     rrd_file_t *rrd_file,
1794     unsigned long rra_begin,
1795     unsigned long *rra_current,
1796     unsigned long elapsed_pdp_st,
1797     rrd_value_t *pdp_temp,
1798     rrd_value_t **seasonal_coef)
1799 {
1800     unsigned long rra_idx, ds_idx, j;
1801
1802     /* number of PDP steps since the last update that
1803      * are assigned to the first CDP to be generated
1804      * since the last update. */
1805     unsigned short scratch_idx;
1806     unsigned long rra_start;
1807     enum cf_en current_cf;
1808
1809     /* this loop is only entered if elapsed_pdp_st < 3 */
1810     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1811          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1812         rra_start = rra_begin;
1813         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1814             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1815                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1816                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1817                     if (scratch_idx == CDP_primary_val) {
1818                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1819                                         elapsed_pdp_st + 1, seasonal_coef);
1820                     } else {
1821                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1822                                         elapsed_pdp_st + 2, seasonal_coef);
1823                     }
1824                     *rra_current = rrd_tell(rrd_file);
1825                 }
1826                 if (rrd_test_error())
1827                     return -1;
1828                 /* loop over data soures within each RRA */
1829                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1830                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1831                                        rra_idx * (rrd->stat_head->ds_cnt) +
1832                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1833                                        *seasonal_coef);
1834                 }
1835             }
1836             rra_start += rrd->rra_def[rra_idx].row_cnt
1837                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1838         }
1839     }
1840     return 0;
1841 }
1842
1843 /* 
1844  * Move sequentially through the file, writing one RRA at a time.  Note this
1845  * architecture divorces the computation of CDP with flushing updated RRA
1846  * entries to disk.
1847  *
1848  * Return 0 on success, -1 on error.
1849  */
1850 static int write_to_rras(
1851     rrd_t *rrd,
1852     rrd_file_t *rrd_file,
1853     unsigned long *rra_step_cnt,
1854     unsigned long rra_begin,
1855     unsigned long *rra_current,
1856     time_t current_time,
1857     unsigned long *skip_update,
1858     rrd_info_t ** pcdp_summary)
1859 {
1860     unsigned long rra_idx;
1861     unsigned long rra_start;
1862     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1863     time_t    rra_time = 0; /* time of update for a RRA */
1864
1865     /* Ready to write to disk */
1866     rra_start = rra_begin;
1867     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1868         /* skip unless there's something to write */
1869         if (rra_step_cnt[rra_idx]) {
1870             /* write the first row */
1871 #ifdef DEBUG
1872             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1873 #endif
1874             rrd->rra_ptr[rra_idx].cur_row++;
1875             if (rrd->rra_ptr[rra_idx].cur_row >=
1876                 rrd->rra_def[rra_idx].row_cnt)
1877                 rrd->rra_ptr[rra_idx].cur_row = 0;  /* wrap around */
1878             /* position on the first row */
1879             rra_pos_tmp = rra_start +
1880                 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1881                 sizeof(rrd_value_t);
1882             if (rra_pos_tmp != *rra_current) {
1883                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1884                     rrd_set_error("seek error in rrd");
1885                     return -1;
1886                 }
1887                 *rra_current = rra_pos_tmp;
1888             }
1889 #ifdef DEBUG
1890             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1891 #endif
1892             if (!skip_update[rra_idx]) {
1893                 if (*pcdp_summary != NULL) {
1894                     rra_time = (current_time - current_time
1895                                 % (rrd->rra_def[rra_idx].pdp_cnt *
1896                                    rrd->stat_head->pdp_step))
1897                         -
1898                         ((rra_step_cnt[rra_idx] -
1899                           1) * rrd->rra_def[rra_idx].pdp_cnt *
1900                          rrd->stat_head->pdp_step);
1901                 }
1902                 if (write_RRA_row
1903                     (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1904                      pcdp_summary, rra_time) == -1)
1905                     return -1;
1906             }
1907
1908             /* write other rows of the bulk update, if any */
1909             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1910                 if (++rrd->rra_ptr[rra_idx].cur_row ==
1911                     rrd->rra_def[rra_idx].row_cnt) {
1912 #ifdef DEBUG
1913                     fprintf(stderr,
1914                             "Wraparound for RRA %s, %lu updates left\n",
1915                             rrd->rra_def[rra_idx].cf_nam,
1916                             rra_step_cnt[rra_idx] - 1);
1917 #endif
1918                     /* wrap */
1919                     rrd->rra_ptr[rra_idx].cur_row = 0;
1920                     /* seek back to beginning of current rra */
1921                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1922                         rrd_set_error("seek error in rrd");
1923                         return -1;
1924                     }
1925 #ifdef DEBUG
1926                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1927                             rrd_file->pos);
1928 #endif
1929                     *rra_current = rra_start;
1930                 }
1931                 if (!skip_update[rra_idx]) {
1932                     if (*pcdp_summary != NULL) {
1933                         rra_time = (current_time - current_time
1934                                     % (rrd->rra_def[rra_idx].pdp_cnt *
1935                                        rrd->stat_head->pdp_step))
1936                             -
1937                             ((rra_step_cnt[rra_idx] -
1938                               2) * rrd->rra_def[rra_idx].pdp_cnt *
1939                              rrd->stat_head->pdp_step);
1940                     }
1941                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1942                                       CDP_secondary_val, pcdp_summary,
1943                                       rra_time) == -1)
1944                         return -1;
1945                 }
1946             }
1947         }
1948         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1949             sizeof(rrd_value_t);
1950     }                   /* RRA LOOP */
1951
1952     return 0;
1953 }
1954
1955 /*
1956  * Write out one row of values (one value per DS) to the archive.
1957  *
1958  * Returns 0 on success, -1 on error.
1959  */
1960 static int write_RRA_row(
1961     rrd_file_t *rrd_file,
1962     rrd_t *rrd,
1963     unsigned long rra_idx,
1964     unsigned long *rra_current,
1965     unsigned short CDP_scratch_idx,
1966     rrd_info_t ** pcdp_summary,
1967     time_t rra_time)
1968 {
1969     unsigned long ds_idx, cdp_idx;
1970     rrd_infoval_t iv;
1971
1972     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1973         /* compute the cdp index */
1974         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1975 #ifdef DEBUG
1976         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1977                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1978                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1979 #endif
1980         if (*pcdp_summary != NULL) {
1981             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1982             /* append info to the return hash */
1983             *pcdp_summary = rrd_info_push(*pcdp_summary,
1984                                           sprintf_alloc
1985                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1986                                            rrd->rra_def[rra_idx].cf_nam,
1987                                            rrd->rra_def[rra_idx].pdp_cnt,
1988                                            rrd->ds_def[ds_idx].ds_nam),
1989                                           RD_I_VAL, iv);
1990         }
1991         if (rrd_write(rrd_file,
1992                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1993                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1994             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1995             return -1;
1996         }
1997         *rra_current += sizeof(rrd_value_t);
1998     }
1999     return 0;
2000 }
2001
2002 /*
2003  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2004  *
2005  * Returns 0 on success, -1 otherwise
2006  */
2007 static int smooth_all_rras(
2008     rrd_t *rrd,
2009     rrd_file_t *rrd_file,
2010     unsigned long rra_begin)
2011 {
2012     unsigned long rra_start = rra_begin;
2013     unsigned long rra_idx;
2014
2015     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2016         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2017             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2018 #ifdef DEBUG
2019             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2020 #endif
2021             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2022             if (rrd_test_error())
2023                 return -1;
2024         }
2025         rra_start += rrd->rra_def[rra_idx].row_cnt
2026             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2027     }
2028     return 0;
2029 }
2030
2031 #ifndef HAVE_MMAP
2032 /*
2033  * Flush changes to disk (unless we're using mmap)
2034  *
2035  * Returns 0 on success, -1 otherwise
2036  */
2037 static int write_changes_to_disk(
2038     rrd_t *rrd,
2039     rrd_file_t *rrd_file,
2040     int version)
2041 {
2042     /* we just need to write back the live header portion now */
2043     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2044                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2045                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2046                  SEEK_SET) != 0) {
2047         rrd_set_error("seek rrd for live header writeback");
2048         return -1;
2049     }
2050     if (version >= 3) {
2051         if (rrd_write(rrd_file, rrd->live_head,
2052                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2053             rrd_set_error("rrd_write live_head to rrd");
2054             return -1;
2055         }
2056     } else {
2057         if (rrd_write(rrd_file, rrd->legacy_last_up,
2058                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2059             rrd_set_error("rrd_write live_head to rrd");
2060             return -1;
2061         }
2062     }
2063
2064
2065     if (rrd_write(rrd_file, rrd->pdp_prep,
2066                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2067         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2068         rrd_set_error("rrd_write pdp_prep to rrd");
2069         return -1;
2070     }
2071
2072     if (rrd_write(rrd_file, rrd->cdp_prep,
2073                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2074                   rrd->stat_head->ds_cnt)
2075         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2076                       rrd->stat_head->ds_cnt)) {
2077
2078         rrd_set_error("rrd_write cdp_prep to rrd");
2079         return -1;
2080     }
2081
2082     if (rrd_write(rrd_file, rrd->rra_ptr,
2083                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2084         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2085         rrd_set_error("rrd_write rra_ptr to rrd");
2086         return -1;
2087     }
2088     return 0;
2089 }
2090 #endif