fix use of setlocale all over the place ...
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.4.3  Copyright by Tobi Oetiker, 1997-2010
3  *                Copyright by Florian Forster, 2008
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
9
10 #include "rrd_tool.h"
11
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
17
18 #include <locale.h>
19
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
22
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
25
26 #include "rrd_client.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37     time_t    tv_sec;   /* seconds */
38     long      tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43     int       tz_minuteswest;   /* minutes W of Greenwich */
44     int       tz_dsttime;   /* type of dst correction */
45 };
46
47 static int gettimeofday(
48     struct timeval *t,
49     struct __timezone *tz)
50 {
51
52     struct _timeb current_time;
53
54     _ftime(&current_time);
55
56     t->tv_sec = current_time.time;
57     t->tv_usec = current_time.millitm * 1000;
58
59     return 0;
60 }
61
62 #endif
63
64 /* FUNCTION PROTOTYPES */
65
66 int       rrd_update_r(
67     const char *filename,
68     const char *tmplt,
69     int argc,
70     const char **argv);
71 int       _rrd_update(
72     const char *filename,
73     const char *tmplt,
74     int argc,
75     const char **argv,
76     rrd_info_t *);
77
78 static int allocate_data_structures(
79     rrd_t *rrd,
80     char ***updvals,
81     rrd_value_t **pdp_temp,
82     const char *tmplt,
83     long **tmpl_idx,
84     unsigned long *tmpl_cnt,
85     unsigned long **rra_step_cnt,
86     unsigned long **skip_update,
87     rrd_value_t **pdp_new);
88
89 static int parse_template(
90     rrd_t *rrd,
91     const char *tmplt,
92     unsigned long *tmpl_cnt,
93     long *tmpl_idx);
94
95 static int process_arg(
96     char *step_start,
97     rrd_t *rrd,
98     rrd_file_t *rrd_file,
99     unsigned long rra_begin,
100     time_t *current_time,
101     unsigned long *current_time_usec,
102     rrd_value_t *pdp_temp,
103     rrd_value_t *pdp_new,
104     unsigned long *rra_step_cnt,
105     char **updvals,
106     long *tmpl_idx,
107     unsigned long tmpl_cnt,
108     rrd_info_t ** pcdp_summary,
109     int version,
110     unsigned long *skip_update,
111     int *schedule_smooth);
112
113 static int parse_ds(
114     rrd_t *rrd,
115     char **updvals,
116     long *tmpl_idx,
117     char *input,
118     unsigned long tmpl_cnt,
119     time_t *current_time,
120     unsigned long *current_time_usec,
121     int version);
122
123 static int get_time_from_reading(
124     rrd_t *rrd,
125     char timesyntax,
126     char **updvals,
127     time_t *current_time,
128     unsigned long *current_time_usec,
129     int version);
130
131 static int update_pdp_prep(
132     rrd_t *rrd,
133     char **updvals,
134     rrd_value_t *pdp_new,
135     double interval);
136
137 static int calculate_elapsed_steps(
138     rrd_t *rrd,
139     unsigned long current_time,
140     unsigned long current_time_usec,
141     double interval,
142     double *pre_int,
143     double *post_int,
144     unsigned long *proc_pdp_cnt);
145
146 static void simple_update(
147     rrd_t *rrd,
148     double interval,
149     rrd_value_t *pdp_new);
150
151 static int process_all_pdp_st(
152     rrd_t *rrd,
153     double interval,
154     double pre_int,
155     double post_int,
156     unsigned long elapsed_pdp_st,
157     rrd_value_t *pdp_new,
158     rrd_value_t *pdp_temp);
159
160 static int process_pdp_st(
161     rrd_t *rrd,
162     unsigned long ds_idx,
163     double interval,
164     double pre_int,
165     double post_int,
166     long diff_pdp_st,
167     rrd_value_t *pdp_new,
168     rrd_value_t *pdp_temp);
169
170 static int update_all_cdp_prep(
171     rrd_t *rrd,
172     unsigned long *rra_step_cnt,
173     unsigned long rra_begin,
174     rrd_file_t *rrd_file,
175     unsigned long elapsed_pdp_st,
176     unsigned long proc_pdp_cnt,
177     rrd_value_t **last_seasonal_coef,
178     rrd_value_t **seasonal_coef,
179     rrd_value_t *pdp_temp,
180     unsigned long *skip_update,
181     int *schedule_smooth);
182
183 static int do_schedule_smooth(
184     rrd_t *rrd,
185     unsigned long rra_idx,
186     unsigned long elapsed_pdp_st);
187
188 static int update_cdp_prep(
189     rrd_t *rrd,
190     unsigned long elapsed_pdp_st,
191     unsigned long start_pdp_offset,
192     unsigned long *rra_step_cnt,
193     int rra_idx,
194     rrd_value_t *pdp_temp,
195     rrd_value_t *last_seasonal_coef,
196     rrd_value_t *seasonal_coef,
197     int current_cf);
198
199 static void update_cdp(
200     unival *scratch,
201     int current_cf,
202     rrd_value_t pdp_temp_val,
203     unsigned long rra_step_cnt,
204     unsigned long elapsed_pdp_st,
205     unsigned long start_pdp_offset,
206     unsigned long pdp_cnt,
207     rrd_value_t xff,
208     int i,
209     int ii);
210
211 static void initialize_cdp_val(
212     unival *scratch,
213     int current_cf,
214     rrd_value_t pdp_temp_val,
215     unsigned long start_pdp_offset,
216     unsigned long pdp_cnt);
217
218 static void reset_cdp(
219     rrd_t *rrd,
220     unsigned long elapsed_pdp_st,
221     rrd_value_t *pdp_temp,
222     rrd_value_t *last_seasonal_coef,
223     rrd_value_t *seasonal_coef,
224     int rra_idx,
225     int ds_idx,
226     int cdp_idx,
227     enum cf_en current_cf);
228
229 static rrd_value_t initialize_carry_over(
230     rrd_value_t pdp_temp_val,
231     int         current_cf,
232     unsigned long elapsed_pdp_st,
233     unsigned long start_pdp_offset,
234     unsigned long pdp_cnt);
235
236 static rrd_value_t calculate_cdp_val(
237     rrd_value_t cdp_val,
238     rrd_value_t pdp_temp_val,
239     unsigned long elapsed_pdp_st,
240     int current_cf,
241     int i,
242     int ii);
243
244 static int update_aberrant_cdps(
245     rrd_t *rrd,
246     rrd_file_t *rrd_file,
247     unsigned long rra_begin,
248     unsigned long elapsed_pdp_st,
249     rrd_value_t *pdp_temp,
250     rrd_value_t **seasonal_coef);
251
252 static int write_to_rras(
253     rrd_t *rrd,
254     rrd_file_t *rrd_file,
255     unsigned long *rra_step_cnt,
256     unsigned long rra_begin,
257     time_t current_time,
258     unsigned long *skip_update,
259     rrd_info_t ** pcdp_summary);
260
261 static int write_RRA_row(
262     rrd_file_t *rrd_file,
263     rrd_t *rrd,
264     unsigned long rra_idx,
265     unsigned short CDP_scratch_idx,
266     rrd_info_t ** pcdp_summary,
267     time_t rra_time);
268
269 static int smooth_all_rras(
270     rrd_t *rrd,
271     rrd_file_t *rrd_file,
272     unsigned long rra_begin);
273
274 #ifndef HAVE_MMAP
275 static int write_changes_to_disk(
276     rrd_t *rrd,
277     rrd_file_t *rrd_file,
278     int version);
279 #endif
280
281 /*
282  * normalize time as returned by gettimeofday. usec part must
283  * be always >= 0
284  */
285 static void normalize_time(
286     struct timeval *t)
287 {
288     if (t->tv_usec < 0) {
289         t->tv_sec--;
290         t->tv_usec += 1e6L;
291     }
292 }
293
294 /*
295  * Sets current_time and current_time_usec based on the current time.
296  * current_time_usec is set to 0 if the version number is 1 or 2.
297  */
298 static void initialize_time(
299     time_t *current_time,
300     unsigned long *current_time_usec,
301     int version)
302 {
303     struct timeval tmp_time;    /* used for time conversion */
304
305     gettimeofday(&tmp_time, 0);
306     normalize_time(&tmp_time);
307     *current_time = tmp_time.tv_sec;
308     if (version >= 3) {
309         *current_time_usec = tmp_time.tv_usec;
310     } else {
311         *current_time_usec = 0;
312     }
313 }
314
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
316
317 rrd_info_t *rrd_update_v(
318     int argc,
319     char **argv)
320 {
321     char     *tmplt = NULL;
322     rrd_info_t *result = NULL;
323     rrd_infoval_t rc;
324     char *opt_daemon = NULL;
325     struct option long_options[] = {
326         {"template", required_argument, 0, 't'},
327         {0, 0, 0, 0}
328     };
329
330     rc.u_int = -1;
331     optind = 0;
332     opterr = 0;         /* initialize getopt */
333
334     while (1) {
335         int       option_index = 0;
336         int       opt;
337
338         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
339
340         if (opt == EOF)
341             break;
342
343         switch (opt) {
344         case 't':
345             tmplt = optarg;
346             break;
347
348         case '?':
349             rrd_set_error("unknown option '%s'", argv[optind - 1]);
350             goto end_tag;
351         }
352     }
353
354     opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355     if (opt_daemon != NULL) {
356         rrd_set_error ("The \"%s\" environment variable is defined, "
357                 "but \"%s\" cannot work with rrdcached. Either unset "
358                 "the environment variable or use \"update\" instead.",
359                 ENV_RRDCACHED_ADDRESS, argv[0]);
360         goto end_tag;
361     }
362
363     /* need at least 2 arguments: filename, data. */
364     if (argc - optind < 2) {
365         rrd_set_error("Not enough arguments");
366         goto end_tag;
367     }
368     rc.u_int = 0;
369     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370     rc.u_int = _rrd_update(argv[optind], tmplt,
371                            argc - optind - 1,
372                            (const char **) (argv + optind + 1), result);
373     result->value.u_int = rc.u_int;
374   end_tag:
375     return result;
376 }
377
378 int rrd_update(
379     int argc,
380     char **argv)
381 {
382     struct option long_options[] = {
383         {"template", required_argument, 0, 't'},
384         {"daemon",   required_argument, 0, 'd'},
385         {0, 0, 0, 0}
386     };
387     int       option_index = 0;
388     int       opt;
389     char     *tmplt = NULL;
390     int       rc = -1;
391     char     *opt_daemon = NULL;
392
393     optind = 0;
394     opterr = 0;         /* initialize getopt */
395
396     while (1) {
397         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
398
399         if (opt == EOF)
400             break;
401
402         switch (opt) {
403         case 't':
404             tmplt = strdup(optarg);
405             break;
406
407         case 'd':
408             if (opt_daemon != NULL)
409                 free (opt_daemon);
410             opt_daemon = strdup (optarg);
411             if (opt_daemon == NULL)
412             {
413                 rrd_set_error("strdup failed.");
414                 goto out;
415             }
416             break;
417
418         case '?':
419             rrd_set_error("unknown option '%s'", argv[optind - 1]);
420             goto out;
421         }
422     }
423
424     /* need at least 2 arguments: filename, data. */
425     if (argc - optind < 2) {
426         rrd_set_error("Not enough arguments");
427         goto out;
428     }
429
430     {   /* try to connect to rrdcached */
431         int status = rrdc_connect(opt_daemon);
432         if (status != 0) return status;
433     }
434
435     if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
436     {
437         rrd_set_error("The caching daemon cannot be used together with "
438                 "templates yet.");
439         goto out;
440     }
441
442     if (! rrdc_is_connected(opt_daemon))
443     {
444       rc = rrd_update_r(argv[optind], tmplt,
445                         argc - optind - 1, (const char **) (argv + optind + 1));
446     }
447     else /* we are connected */
448     {
449         rc = rrdc_update (argv[optind], /* file */
450                           argc - optind - 1, /* values_num */
451                           (const char *const *) (argv + optind + 1)); /* values */
452         if (rc > 0)
453             rrd_set_error("Failed sending the values to rrdcached: %s",
454                           rrd_strerror (rc));
455     }
456
457   out:
458     if (tmplt != NULL)
459     {
460         free(tmplt);
461         tmplt = NULL;
462     }
463     if (opt_daemon != NULL)
464     {
465         free (opt_daemon);
466         opt_daemon = NULL;
467     }
468     return rc;
469 }
470
471 int rrd_update_r(
472     const char *filename,
473     const char *tmplt,
474     int argc,
475     const char **argv)
476 {
477     return _rrd_update(filename, tmplt, argc, argv, NULL);
478 }
479
480 int rrd_update_v_r(
481     const char *filename,
482     const char *tmplt,
483     int argc,
484     const char **argv,
485     rrd_info_t * pcdp_summary)
486 {
487     return _rrd_update(filename, tmplt, argc, argv, pcdp_summary);
488 }
489
490 int _rrd_update(
491     const char *filename,
492     const char *tmplt,
493     int argc,
494     const char **argv,
495     rrd_info_t * pcdp_summary)
496 {
497
498     int       arg_i = 2;
499
500     unsigned long rra_begin;    /* byte pointer to the rra
501                                  * area in the rrd file.  this
502                                  * pointer never changes value */
503     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
504                              * to the existing entry */
505     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
506                              * to the cdp values */
507
508     long     *tmpl_idx; /* index representing the settings
509                          * transported by the tmplt index */
510     unsigned long tmpl_cnt = 2; /* time and data */
511     rrd_t     rrd;
512     time_t    current_time = 0;
513     unsigned long current_time_usec = 0;    /* microseconds part of current time */
514     char    **updvals;
515     int       schedule_smooth = 0;
516
517     /* number of elapsed PDP steps since last update */
518     unsigned long *rra_step_cnt = NULL;
519
520     int       version;  /* rrd version */
521     rrd_file_t *rrd_file;
522     char     *arg_copy; /* for processing the argv */
523     unsigned long *skip_update; /* RRAs to advance but not write */
524
525     /* need at least 1 arguments: data. */
526     if (argc < 1) {
527         rrd_set_error("Not enough arguments");
528         goto err_out;
529     }
530
531     rrd_init(&rrd);
532     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
533         goto err_free;
534     }
535     /* We are now at the beginning of the rra's */
536     rra_begin = rrd_file->header_len;
537
538     version = atoi(rrd.stat_head->version);
539
540     initialize_time(&current_time, &current_time_usec, version);
541
542     /* get exclusive lock to whole file.
543      * lock gets removed when we close the file.
544      */
545     if (rrd_lock(rrd_file) != 0) {
546         rrd_set_error("could not lock RRD");
547         goto err_close;
548     }
549
550     if (allocate_data_structures(&rrd, &updvals,
551                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
552                                  &rra_step_cnt, &skip_update,
553                                  &pdp_new) == -1) {
554         goto err_close;
555     }
556
557     /* loop through the arguments. */
558     for (arg_i = 0; arg_i < argc; arg_i++) {
559         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
560             rrd_set_error("failed duplication argv entry");
561             break;
562         }
563         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
564                         &current_time, &current_time_usec, pdp_temp, pdp_new,
565                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
566                         &pcdp_summary, version, skip_update,
567                         &schedule_smooth) == -1) {
568             if (rrd_test_error()) { /* Should have error string always here */
569                 char     *save_error;
570
571                 /* Prepend file name to error message */
572                 if ((save_error = strdup(rrd_get_error())) != NULL) {
573                     rrd_set_error("%s: %s", filename, save_error);
574                     free(save_error);
575                 }
576             }
577             free(arg_copy);
578             break;
579         }
580         free(arg_copy);
581     }
582
583     free(rra_step_cnt);
584
585     /* if we got here and if there is an error and if the file has not been
586      * written to, then close things up and return. */
587     if (rrd_test_error()) {
588         goto err_free_structures;
589     }
590 #ifndef HAVE_MMAP
591     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
592         goto err_free_structures;
593     }
594 #endif
595
596     /* calling the smoothing code here guarantees at most one smoothing
597      * operation per rrd_update call. Unfortunately, it is possible with bulk
598      * updates, or a long-delayed update for smoothing to occur off-schedule.
599      * This really isn't critical except during the burn-in cycles. */
600     if (schedule_smooth) {
601         smooth_all_rras(&rrd, rrd_file, rra_begin);
602     }
603
604 /*    rrd_dontneed(rrd_file,&rrd); */
605     rrd_free(&rrd);
606     rrd_close(rrd_file);
607
608     free(pdp_new);
609     free(tmpl_idx);
610     free(pdp_temp);
611     free(skip_update);
612     free(updvals);
613     return 0;
614
615   err_free_structures:
616     free(pdp_new);
617     free(tmpl_idx);
618     free(pdp_temp);
619     free(skip_update);
620     free(updvals);
621   err_close:
622     rrd_close(rrd_file);
623   err_free:
624     rrd_free(&rrd);
625   err_out:
626     return -1;
627 }
628
629 /*
630  * Allocate some important arrays used, and initialize the template.
631  *
632  * When it returns, either all of the structures are allocated
633  * or none of them are.
634  *
635  * Returns 0 on success, -1 on error.
636  */
637 static int allocate_data_structures(
638     rrd_t *rrd,
639     char ***updvals,
640     rrd_value_t **pdp_temp,
641     const char *tmplt,
642     long **tmpl_idx,
643     unsigned long *tmpl_cnt,
644     unsigned long **rra_step_cnt,
645     unsigned long **skip_update,
646     rrd_value_t **pdp_new)
647 {
648     unsigned  i, ii;
649     if ((*updvals = (char **) malloc(sizeof(char *)
650                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
651         rrd_set_error("allocating updvals pointer array.");
652         return -1;
653     }
654     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
655                                             * rrd->stat_head->ds_cnt)) ==
656         NULL) {
657         rrd_set_error("allocating pdp_temp.");
658         goto err_free_updvals;
659     }
660     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
661                                                  *
662                                                  rrd->stat_head->rra_cnt)) ==
663         NULL) {
664         rrd_set_error("allocating skip_update.");
665         goto err_free_pdp_temp;
666     }
667     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
668                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
669         rrd_set_error("allocating tmpl_idx.");
670         goto err_free_skip_update;
671     }
672     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
673                                                   *
674                                                   (rrd->stat_head->
675                                                    rra_cnt))) == NULL) {
676         rrd_set_error("allocating rra_step_cnt.");
677         goto err_free_tmpl_idx;
678     }
679
680     /* initialize tmplt redirector */
681     /* default config example (assume DS 1 is a CDEF DS)
682        tmpl_idx[0] -> 0; (time)
683        tmpl_idx[1] -> 1; (DS 0)
684        tmpl_idx[2] -> 3; (DS 2)
685        tmpl_idx[3] -> 4; (DS 3) */
686     (*tmpl_idx)[0] = 0; /* time */
687     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
688         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
689             (*tmpl_idx)[ii++] = i;
690     }
691     *tmpl_cnt = ii;
692
693     if (tmplt != NULL) {
694         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
695             goto err_free_rra_step_cnt;
696         }
697     }
698
699     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
700                                            * rrd->stat_head->ds_cnt)) == NULL) {
701         rrd_set_error("allocating pdp_new.");
702         goto err_free_rra_step_cnt;
703     }
704
705     return 0;
706
707   err_free_rra_step_cnt:
708     free(*rra_step_cnt);
709   err_free_tmpl_idx:
710     free(*tmpl_idx);
711   err_free_skip_update:
712     free(*skip_update);
713   err_free_pdp_temp:
714     free(*pdp_temp);
715   err_free_updvals:
716     free(*updvals);
717     return -1;
718 }
719
720 /*
721  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
722  *
723  * Returns 0 on success.
724  */
725 static int parse_template(
726     rrd_t *rrd,
727     const char *tmplt,
728     unsigned long *tmpl_cnt,
729     long *tmpl_idx)
730 {
731     char     *dsname, *tmplt_copy;
732     unsigned int tmpl_len, i;
733     int       ret = 0;
734
735     *tmpl_cnt = 1;      /* the first entry is the time */
736
737     /* we should work on a writeable copy here */
738     if ((tmplt_copy = strdup(tmplt)) == NULL) {
739         rrd_set_error("error copying tmplt '%s'", tmplt);
740         ret = -1;
741         goto out;
742     }
743
744     dsname = tmplt_copy;
745     tmpl_len = strlen(tmplt_copy);
746     for (i = 0; i <= tmpl_len; i++) {
747         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
748             tmplt_copy[i] = '\0';
749             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
750                 rrd_set_error("tmplt contains more DS definitions than RRD");
751                 ret = -1;
752                 goto out_free_tmpl_copy;
753             }
754             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
755                 rrd_set_error("unknown DS name '%s'", dsname);
756                 ret = -1;
757                 goto out_free_tmpl_copy;
758             }
759             /* go to the next entry on the tmplt_copy */
760             if (i < tmpl_len)
761                 dsname = &tmplt_copy[i + 1];
762         }
763     }
764   out_free_tmpl_copy:
765     free(tmplt_copy);
766   out:
767     return ret;
768 }
769
770 /*
771  * Parse an update string, updates the primary data points (PDPs)
772  * and consolidated data points (CDPs), and writes changes to the RRAs.
773  *
774  * Returns 0 on success, -1 on error.
775  */
776 static int process_arg(
777     char *step_start,
778     rrd_t *rrd,
779     rrd_file_t *rrd_file,
780     unsigned long rra_begin,
781     time_t *current_time,
782     unsigned long *current_time_usec,
783     rrd_value_t *pdp_temp,
784     rrd_value_t *pdp_new,
785     unsigned long *rra_step_cnt,
786     char **updvals,
787     long *tmpl_idx,
788     unsigned long tmpl_cnt,
789     rrd_info_t ** pcdp_summary,
790     int version,
791     unsigned long *skip_update,
792     int *schedule_smooth)
793 {
794     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
795
796     /* a vector of future Holt-Winters seasonal coefs */
797     unsigned long elapsed_pdp_st;
798
799     double    interval, pre_int, post_int;  /* interval between this and
800                                              * the last run */
801     unsigned long proc_pdp_cnt;
802
803     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
804                  current_time, current_time_usec, version) == -1) {
805         return -1;
806     }
807
808     interval = (double) (*current_time - rrd->live_head->last_up)
809         + (double) ((long) *current_time_usec -
810                     (long) rrd->live_head->last_up_usec) / 1e6f;
811
812     /* process the data sources and update the pdp_prep 
813      * area accordingly */
814     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
815         return -1;
816     }
817
818     elapsed_pdp_st = calculate_elapsed_steps(rrd,
819                                              *current_time,
820                                              *current_time_usec, interval,
821                                              &pre_int, &post_int,
822                                              &proc_pdp_cnt);
823
824     /* has a pdp_st moment occurred since the last run ? */
825     if (elapsed_pdp_st == 0) {
826         /* no we have not passed a pdp_st moment. therefore update is simple */
827         simple_update(rrd, interval, pdp_new);
828     } else {
829         /* an pdp_st has occurred. */
830         if (process_all_pdp_st(rrd, interval,
831                                pre_int, post_int,
832                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
833             return -1;
834         }
835         if (update_all_cdp_prep(rrd, rra_step_cnt,
836                                 rra_begin, rrd_file,
837                                 elapsed_pdp_st,
838                                 proc_pdp_cnt,
839                                 &last_seasonal_coef,
840                                 &seasonal_coef,
841                                 pdp_temp,
842                                 skip_update, schedule_smooth) == -1) {
843             goto err_free_coefficients;
844         }
845         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
846                                  elapsed_pdp_st, pdp_temp,
847                                  &seasonal_coef) == -1) {
848             goto err_free_coefficients;
849         }
850         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
851                           *current_time, skip_update,
852                           pcdp_summary) == -1) {
853             goto err_free_coefficients;
854         }
855     }                   /* endif a pdp_st has occurred */
856     rrd->live_head->last_up = *current_time;
857     rrd->live_head->last_up_usec = *current_time_usec;
858
859     if (version < 3) {
860         *rrd->legacy_last_up = rrd->live_head->last_up;
861     }
862     free(seasonal_coef);
863     free(last_seasonal_coef);
864     return 0;
865
866   err_free_coefficients:
867     free(seasonal_coef);
868     free(last_seasonal_coef);
869     return -1;
870 }
871
872 /*
873  * Parse a DS string (time + colon-separated values), storing the
874  * results in current_time, current_time_usec, and updvals.
875  *
876  * Returns 0 on success, -1 on error.
877  */
878 static int parse_ds(
879     rrd_t *rrd,
880     char **updvals,
881     long *tmpl_idx,
882     char *input,
883     unsigned long tmpl_cnt,
884     time_t *current_time,
885     unsigned long *current_time_usec,
886     int version)
887 {
888     char     *p;
889     unsigned long i;
890     char      timesyntax;
891
892     updvals[0] = input;
893     /* initialize all ds input to unknown except the first one
894        which has always got to be set */
895     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
896         updvals[i] = "U";
897
898     /* separate all ds elements; first must be examined separately
899        due to alternate time syntax */
900     if ((p = strchr(input, '@')) != NULL) {
901         timesyntax = '@';
902     } else if ((p = strchr(input, ':')) != NULL) {
903         timesyntax = ':';
904     } else {
905         rrd_set_error("expected timestamp not found in data source from %s",
906                       input);
907         return -1;
908     }
909     *p = '\0';
910     i = 1;
911     updvals[tmpl_idx[i++]] = p + 1;
912     while (*(++p)) {
913         if (*p == ':') {
914             *p = '\0';
915             if (i < tmpl_cnt) {
916                 updvals[tmpl_idx[i++]] = p + 1;
917             }
918             else {
919                 rrd_set_error("found extra data on update argument: %s",p+1);
920                 return -1;
921             }                
922         }
923     }
924
925     if (i != tmpl_cnt) {
926         rrd_set_error("expected %lu data source readings (got %lu) from %s",
927                       tmpl_cnt - 1, i - 1, input);
928         return -1;
929     }
930
931     if (get_time_from_reading(rrd, timesyntax, updvals,
932                               current_time, current_time_usec,
933                               version) == -1) {
934         return -1;
935     }
936     return 0;
937 }
938
939 /*
940  * Parse the time in a DS string, store it in current_time and 
941  * current_time_usec and verify that it's later than the last
942  * update for this DS.
943  *
944  * Returns 0 on success, -1 on error.
945  */
946 static int get_time_from_reading(
947     rrd_t *rrd,
948     char timesyntax,
949     char **updvals,
950     time_t *current_time,
951     unsigned long *current_time_usec,
952     int version)
953 {
954     double    tmp;
955     char     *parsetime_error = NULL;
956     char     *old_locale;
957     rrd_time_value_t ds_tv;
958     struct timeval tmp_time;    /* used for time conversion */
959
960     /* get the time from the reading ... handle N */
961     if (timesyntax == '@') {    /* at-style */
962         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
963             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
964             return -1;
965         }
966         if (ds_tv.type == RELATIVE_TO_END_TIME ||
967             ds_tv.type == RELATIVE_TO_START_TIME) {
968             rrd_set_error("specifying time relative to the 'start' "
969                           "or 'end' makes no sense here: %s", updvals[0]);
970             return -1;
971         }
972         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
973         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
974     } else if (strcmp(updvals[0], "N") == 0) {
975         gettimeofday(&tmp_time, 0);
976         normalize_time(&tmp_time);
977         *current_time = tmp_time.tv_sec;
978         *current_time_usec = tmp_time.tv_usec;
979     } else {
980         old_locale = setlocale(LC_NUMERIC, NULL);
981         setlocale(LC_NUMERIC, "C");
982         errno = 0;
983         tmp = strtod(updvals[0], 0);
984         if (errno > 0) {
985             rrd_set_error("converting '%s' to float: %s",
986                 updvals[0], rrd_strerror(errno));
987             return -1;
988         };
989         setlocale(LC_NUMERIC, old_locale);
990         if (tmp < 0.0){
991             gettimeofday(&tmp_time, 0);
992             tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
993         }
994
995         *current_time = floor(tmp);
996         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
997     }
998     /* dont do any correction for old version RRDs */
999     if (version < 3)
1000         *current_time_usec = 0;
1001
1002     if (*current_time < rrd->live_head->last_up ||
1003         (*current_time == rrd->live_head->last_up &&
1004          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1005         rrd_set_error("illegal attempt to update using time %ld when "
1006                       "last update time is %ld (minimum one second step)",
1007                       *current_time, rrd->live_head->last_up);
1008         return -1;
1009     }
1010     return 0;
1011 }
1012
1013 /*
1014  * Update pdp_new by interpreting the updvals according to the DS type
1015  * (COUNTER, GAUGE, etc.).
1016  *
1017  * Returns 0 on success, -1 on error.
1018  */
1019 static int update_pdp_prep(
1020     rrd_t *rrd,
1021     char **updvals,
1022     rrd_value_t *pdp_new,
1023     double interval)
1024 {
1025     unsigned long ds_idx;
1026     int       ii;
1027     char     *endptr;   /* used in the conversion */
1028     double    rate;
1029     char     *old_locale;
1030     enum dst_en dst_idx;
1031
1032     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1033         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1034
1035         /* make sure we do not build diffs with old last_ds values */
1036         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1037             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1038             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1039         }
1040
1041         /* NOTE: DST_CDEF should never enter this if block, because
1042          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1043          * accidently specified a value for the DST_CDEF. To handle this case,
1044          * an extra check is required. */
1045
1046         if ((updvals[ds_idx + 1][0] != 'U') &&
1047             (dst_idx != DST_CDEF) &&
1048             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1049             rate = DNAN;
1050
1051             /* pdp_new contains rate * time ... eg the bytes transferred during
1052              * the interval. Doing it this way saves a lot of math operations
1053              */
1054             switch (dst_idx) {
1055             case DST_COUNTER:
1056             case DST_DERIVE:
1057                 /* Check if this is a valid integer. `U' is already handled in
1058                  * another branch. */
1059                 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1060                     if ((ii == 0) && (dst_idx == DST_DERIVE)
1061                             && (updvals[ds_idx + 1][ii] == '-'))
1062                         continue;
1063
1064                     if ((updvals[ds_idx + 1][ii] < '0')
1065                             || (updvals[ds_idx + 1][ii] > '9')) {
1066                         rrd_set_error("not a simple %s integer: '%s'",
1067                                 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1068                                 updvals[ds_idx + 1]);
1069                         return -1;
1070                     }
1071                 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1072
1073                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1074                     pdp_new[ds_idx] =
1075                         rrd_diff(updvals[ds_idx + 1],
1076                                  rrd->pdp_prep[ds_idx].last_ds);
1077                     if (dst_idx == DST_COUNTER) {
1078                         /* simple overflow catcher. This will fail
1079                          * terribly for non 32 or 64 bit counters
1080                          * ... are there any others in SNMP land?
1081                          */
1082                         if (pdp_new[ds_idx] < (double) 0.0)
1083                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1084                         if (pdp_new[ds_idx] < (double) 0.0)
1085                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1086                     }
1087                     rate = pdp_new[ds_idx] / interval;
1088                 } else {
1089                     pdp_new[ds_idx] = DNAN;
1090                 }
1091                 break;
1092             case DST_ABSOLUTE:
1093                 old_locale = setlocale(LC_NUMERIC, NULL);
1094                 setlocale(LC_NUMERIC, "C");
1095                 errno = 0;
1096                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1097                 if (errno > 0) {
1098                     rrd_set_error("converting '%s' to float: %s",
1099                                   updvals[ds_idx + 1], rrd_strerror(errno));
1100                     return -1;
1101                 };
1102                 setlocale(LC_NUMERIC, old_locale);
1103                 if (endptr[0] != '\0') {
1104                     rrd_set_error
1105                         ("conversion of '%s' to float not complete: tail '%s'",
1106                          updvals[ds_idx + 1], endptr);
1107                     return -1;
1108                 }
1109                 rate = pdp_new[ds_idx] / interval;
1110                 break;
1111             case DST_GAUGE:
1112                 old_locale = setlocale(LC_NUMERIC, NULL);
1113                 setlocale(LC_NUMERIC, "C");
1114                 errno = 0;
1115                 pdp_new[ds_idx] =
1116                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1117                 if (errno) {
1118                     rrd_set_error("converting '%s' to float: %s",
1119                                   updvals[ds_idx + 1], rrd_strerror(errno));
1120                     return -1;
1121                 };
1122                 setlocale(LC_NUMERIC, old_locale);
1123                 if (endptr[0] != '\0') {
1124                     rrd_set_error
1125                         ("conversion of '%s' to float not complete: tail '%s'",
1126                          updvals[ds_idx + 1], endptr);
1127                     return -1;
1128                 }
1129                 rate = pdp_new[ds_idx] / interval;
1130                 break;
1131             default:
1132                 rrd_set_error("rrd contains unknown DS type : '%s'",
1133                               rrd->ds_def[ds_idx].dst);
1134                 return -1;
1135             }
1136             /* break out of this for loop if the error string is set */
1137             if (rrd_test_error()) {
1138                 return -1;
1139             }
1140             /* make sure pdp_temp is neither too large or too small
1141              * if any of these occur it becomes unknown ...
1142              * sorry folks ... */
1143             if (!isnan(rate) &&
1144                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1145                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1146                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1147                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1148                 pdp_new[ds_idx] = DNAN;
1149             }
1150         } else {
1151             /* no news is news all the same */
1152             pdp_new[ds_idx] = DNAN;
1153         }
1154
1155
1156         /* make a copy of the command line argument for the next run */
1157 #ifdef DEBUG
1158         fprintf(stderr, "prep ds[%lu]\t"
1159                 "last_arg '%s'\t"
1160                 "this_arg '%s'\t"
1161                 "pdp_new %10.2f\n",
1162                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1163                 pdp_new[ds_idx]);
1164 #endif
1165         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1166                 LAST_DS_LEN - 1);
1167         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1168     }
1169     return 0;
1170 }
1171
1172 /*
1173  * How many PDP steps have elapsed since the last update? Returns the answer,
1174  * and stores the time between the last update and the last PDP in pre_time,
1175  * and the time between the last PDP and the current time in post_int.
1176  */
1177 static int calculate_elapsed_steps(
1178     rrd_t *rrd,
1179     unsigned long current_time,
1180     unsigned long current_time_usec,
1181     double interval,
1182     double *pre_int,
1183     double *post_int,
1184     unsigned long *proc_pdp_cnt)
1185 {
1186     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1187     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1188                                  * time */
1189     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1190                                  * when it was last updated */
1191     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1192
1193     /* when was the current pdp started */
1194     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1195     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1196
1197     /* when did the last pdp_st occur */
1198     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1199     occu_pdp_st = current_time - occu_pdp_age;
1200
1201     if (occu_pdp_st > proc_pdp_st) {
1202         /* OK we passed the pdp_st moment */
1203         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1204                                                                      * occurred before the latest
1205                                                                      * pdp_st moment*/
1206         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1207         *post_int = occu_pdp_age;   /* how much after it */
1208         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1209     } else {
1210         *pre_int = interval;
1211         *post_int = 0;
1212     }
1213
1214     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1215
1216 #ifdef DEBUG
1217     printf("proc_pdp_age %lu\t"
1218            "proc_pdp_st %lu\t"
1219            "occu_pfp_age %lu\t"
1220            "occu_pdp_st %lu\t"
1221            "int %lf\t"
1222            "pre_int %lf\t"
1223            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1224            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1225 #endif
1226
1227     /* compute the number of elapsed pdp_st moments */
1228     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1229 }
1230
1231 /*
1232  * Increment the PDP values by the values in pdp_new, or else initialize them.
1233  */
1234 static void simple_update(
1235     rrd_t *rrd,
1236     double interval,
1237     rrd_value_t *pdp_new)
1238 {
1239     int       i;
1240
1241     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1242         if (isnan(pdp_new[i])) {
1243             /* this is not really accurate if we use subsecond data arrival time
1244                should have thought of it when going subsecond resolution ...
1245                sorry next format change we will have it! */
1246             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1247                 floor(interval);
1248         } else {
1249             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1250                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1251             } else {
1252                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1253             }
1254         }
1255 #ifdef DEBUG
1256         fprintf(stderr,
1257                 "NO PDP  ds[%i]\t"
1258                 "value %10.2f\t"
1259                 "unkn_sec %5lu\n",
1260                 i,
1261                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1262                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1263 #endif
1264     }
1265 }
1266
1267 /*
1268  * Call process_pdp_st for each DS.
1269  *
1270  * Returns 0 on success, -1 on error.
1271  */
1272 static int process_all_pdp_st(
1273     rrd_t *rrd,
1274     double interval,
1275     double pre_int,
1276     double post_int,
1277     unsigned long elapsed_pdp_st,
1278     rrd_value_t *pdp_new,
1279     rrd_value_t *pdp_temp)
1280 {
1281     unsigned long ds_idx;
1282
1283     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1284        rate*seconds which occurred up to the last run.
1285        pdp_new[] contains rate*seconds from the latest run.
1286        pdp_temp[] will contain the rate for cdp */
1287
1288     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1289         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1290                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1291                            pdp_new, pdp_temp) == -1) {
1292             return -1;
1293         }
1294 #ifdef DEBUG
1295         fprintf(stderr, "PDP UPD ds[%lu]\t"
1296                 "elapsed_pdp_st %lu\t"
1297                 "pdp_temp %10.2f\t"
1298                 "new_prep %10.2f\t"
1299                 "new_unkn_sec %5lu\n",
1300                 ds_idx,
1301                 elapsed_pdp_st,
1302                 pdp_temp[ds_idx],
1303                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1304                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1305 #endif
1306     }
1307     return 0;
1308 }
1309
1310 /*
1311  * Process an update that occurs after one of the PDP moments.
1312  * Increments the PDP value, sets NAN if time greater than the
1313  * heartbeats have elapsed, processes CDEFs.
1314  *
1315  * Returns 0 on success, -1 on error.
1316  */
1317 static int process_pdp_st(
1318     rrd_t *rrd,
1319     unsigned long ds_idx,
1320     double interval,
1321     double pre_int,
1322     double post_int,
1323     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1324     rrd_value_t *pdp_new,
1325     rrd_value_t *pdp_temp)
1326 {
1327     int       i;
1328
1329     /* update pdp_prep to the current pdp_st. */
1330     double    pre_unknown = 0.0;
1331     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1332     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1333
1334     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1335
1336     rpnstack_init(&rpnstack);
1337
1338
1339     if (isnan(pdp_new[ds_idx])) {
1340         /* a final bit of unknown to be added before calculation
1341            we use a temporary variable for this so that we
1342            don't have to turn integer lines before using the value */
1343         pre_unknown = pre_int;
1344     } else {
1345         if (isnan(scratch[PDP_val].u_val)) {
1346             scratch[PDP_val].u_val = 0;
1347         }
1348         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1349     }
1350
1351     /* if too much of the pdp_prep is unknown we dump it */
1352     /* if the interval is larger thatn mrhb we get NAN */
1353     if ((interval > mrhb) ||
1354         (rrd->stat_head->pdp_step / 2.0 <
1355          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1356         pdp_temp[ds_idx] = DNAN;
1357     } else {
1358         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1359             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1360              pre_unknown);
1361     }
1362
1363     /* process CDEF data sources; remember each CDEF DS can
1364      * only reference other DS with a lower index number */
1365     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1366         rpnp_t   *rpnp;
1367
1368         rpnp =
1369             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1370         if(rpnp == NULL) {
1371           rpnstack_free(&rpnstack);
1372           return -1;
1373         }
1374         /* substitute data values for OP_VARIABLE nodes */
1375         for (i = 0; rpnp[i].op != OP_END; i++) {
1376             if (rpnp[i].op == OP_VARIABLE) {
1377                 rpnp[i].op = OP_NUMBER;
1378                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1379             }
1380         }
1381         /* run the rpn calculator */
1382         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1383             free(rpnp);
1384             rpnstack_free(&rpnstack);
1385             return -1;
1386         }
1387         free(rpnp);
1388     }
1389
1390     /* make pdp_prep ready for the next run */
1391     if (isnan(pdp_new[ds_idx])) {
1392         /* this is not realy accurate if we use subsecond data arival time
1393            should have thought of it when going subsecond resolution ...
1394            sorry next format change we will have it! */
1395         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1396         scratch[PDP_val].u_val = DNAN;
1397     } else {
1398         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1399         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1400     }
1401     rpnstack_free(&rpnstack);
1402     return 0;
1403 }
1404
1405 /*
1406  * Iterate over all the RRAs for a given DS and:
1407  * 1. Decide whether to schedule a smooth later
1408  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1409  * 3. Update the CDP
1410  *
1411  * Returns 0 on success, -1 on error
1412  */
1413 static int update_all_cdp_prep(
1414     rrd_t *rrd,
1415     unsigned long *rra_step_cnt,
1416     unsigned long rra_begin,
1417     rrd_file_t *rrd_file,
1418     unsigned long elapsed_pdp_st,
1419     unsigned long proc_pdp_cnt,
1420     rrd_value_t **last_seasonal_coef,
1421     rrd_value_t **seasonal_coef,
1422     rrd_value_t *pdp_temp,
1423     unsigned long *skip_update,
1424     int *schedule_smooth)
1425 {
1426     unsigned long rra_idx;
1427
1428     /* index into the CDP scratch array */
1429     enum cf_en current_cf;
1430     unsigned long rra_start;
1431
1432     /* number of rows to be updated in an RRA for a data value. */
1433     unsigned long start_pdp_offset;
1434
1435     rra_start = rra_begin;
1436     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1437         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1438         start_pdp_offset =
1439             rrd->rra_def[rra_idx].pdp_cnt -
1440             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1441         skip_update[rra_idx] = 0;
1442         if (start_pdp_offset <= elapsed_pdp_st) {
1443             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1444                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1445         } else {
1446             rra_step_cnt[rra_idx] = 0;
1447         }
1448
1449         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1450             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1451              * so that they will be correct for the next observed value; note that for
1452              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1453              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1454             if (rra_step_cnt[rra_idx] > 1) {
1455                 skip_update[rra_idx] = 1;
1456                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1457                                 elapsed_pdp_st, last_seasonal_coef);
1458                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1459                                 elapsed_pdp_st + 1, seasonal_coef);
1460             }
1461             /* periodically run a smoother for seasonal effects */
1462             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1463 #ifdef DEBUG
1464                 fprintf(stderr,
1465                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1466                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1467                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1468                         u_cnt);
1469 #endif
1470                 *schedule_smooth = 1;
1471             }
1472         }
1473         if (rrd_test_error())
1474             return -1;
1475
1476         if (update_cdp_prep
1477             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1478              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1479              current_cf) == -1) {
1480             return -1;
1481         }
1482         rra_start +=
1483             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1484             sizeof(rrd_value_t);
1485     }
1486     return 0;
1487 }
1488
1489 /* 
1490  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1491  */
1492 static int do_schedule_smooth(
1493     rrd_t *rrd,
1494     unsigned long rra_idx,
1495     unsigned long elapsed_pdp_st)
1496 {
1497     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1498     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1499     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1500     unsigned long seasonal_smooth_idx =
1501         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1502     unsigned long *init_seasonal =
1503         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1504
1505     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1506      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1507      * really an RRA level, not a data source within RRA level parameter, but
1508      * the rra_def is read only for rrd_update (not flushed to disk). */
1509     if (*init_seasonal > BURNIN_CYCLES) {
1510         /* someone has no doubt invented a trick to deal with this wrap around,
1511          * but at least this code is clear. */
1512         if (seasonal_smooth_idx > cur_row) {
1513             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1514              * between PDP and CDP */
1515             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1516         }
1517         /* can't rely on negative numbers because we are working with
1518          * unsigned values */
1519         return (cur_row + elapsed_pdp_st >= row_cnt
1520                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1521     }
1522     /* mark off one of the burn-in cycles */
1523     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1524 }
1525
1526 /*
1527  * For a given RRA, iterate over the data sources and call the appropriate
1528  * consolidation function.
1529  *
1530  * Returns 0 on success, -1 on error.
1531  */
1532 static int update_cdp_prep(
1533     rrd_t *rrd,
1534     unsigned long elapsed_pdp_st,
1535     unsigned long start_pdp_offset,
1536     unsigned long *rra_step_cnt,
1537     int rra_idx,
1538     rrd_value_t *pdp_temp,
1539     rrd_value_t *last_seasonal_coef,
1540     rrd_value_t *seasonal_coef,
1541     int current_cf)
1542 {
1543     unsigned long ds_idx, cdp_idx;
1544
1545     /* update CDP_PREP areas */
1546     /* loop over data soures within each RRA */
1547     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1548
1549         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1550
1551         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1552             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1553                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1554                        elapsed_pdp_st, start_pdp_offset,
1555                        rrd->rra_def[rra_idx].pdp_cnt,
1556                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1557                        rra_idx, ds_idx);
1558         } else {
1559             /* Nothing to consolidate if there's one PDP per CDP. However, if
1560              * we've missed some PDPs, let's update null counters etc. */
1561             if (elapsed_pdp_st > 2) {
1562                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1563                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1564                           (enum cf_en)current_cf);
1565             }
1566         }
1567
1568         if (rrd_test_error())
1569             return -1;
1570     }                   /* endif data sources loop */
1571     return 0;
1572 }
1573
1574 /*
1575  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1576  * primary value, secondary value, and # of unknowns.
1577  */
1578 static void update_cdp(
1579     unival *scratch,
1580     int current_cf,
1581     rrd_value_t pdp_temp_val,
1582     unsigned long rra_step_cnt,
1583     unsigned long elapsed_pdp_st,
1584     unsigned long start_pdp_offset,
1585     unsigned long pdp_cnt,
1586     rrd_value_t xff,
1587     int i,
1588     int ii)
1589 {
1590     /* shorthand variables */
1591     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1592     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1593     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1594     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1595
1596     if (rra_step_cnt) {
1597         /* If we are in this block, as least 1 CDP value will be written to
1598          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1599          * to be written, then the "fill in" value is the CDP_secondary_val
1600          * entry. */
1601         if (isnan(pdp_temp_val)) {
1602             *cdp_unkn_pdp_cnt += start_pdp_offset;
1603             *cdp_secondary_val = DNAN;
1604         } else {
1605             /* CDP_secondary value is the RRA "fill in" value for intermediary
1606              * CDP data entries. No matter the CF, the value is the same because
1607              * the average, max, min, and last of a list of identical values is
1608              * the same, namely, the value itself. */
1609             *cdp_secondary_val = pdp_temp_val;
1610         }
1611
1612         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1613             *cdp_primary_val = DNAN;
1614         } else {
1615             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1616                                start_pdp_offset, pdp_cnt);
1617         }
1618         *cdp_val =
1619             initialize_carry_over(pdp_temp_val,current_cf,
1620                                   elapsed_pdp_st,
1621                                   start_pdp_offset, pdp_cnt);
1622                /* endif meets xff value requirement for a valid value */
1623         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1624          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1625         if (isnan(pdp_temp_val))
1626             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1627         else
1628             *cdp_unkn_pdp_cnt = 0;
1629     } else {            /* rra_step_cnt[i]  == 0 */
1630
1631 #ifdef DEBUG
1632         if (isnan(*cdp_val)) {
1633             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1634                     i, ii);
1635         } else {
1636             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1637                     i, ii, *cdp_val);
1638         }
1639 #endif
1640         if (isnan(pdp_temp_val)) {
1641             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1642         } else {
1643             *cdp_val =
1644                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1645                                   current_cf, i, ii);
1646         }
1647     }
1648 }
1649
1650 /*
1651  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1652  * on the type of consolidation function.
1653  */
1654 static void initialize_cdp_val(
1655     unival *scratch,
1656     int current_cf,
1657     rrd_value_t pdp_temp_val,
1658     unsigned long start_pdp_offset,
1659     unsigned long pdp_cnt)
1660 {
1661     rrd_value_t cum_val, cur_val;
1662
1663     switch (current_cf) {
1664     case CF_AVERAGE:
1665         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1666         cur_val = IFDNAN(pdp_temp_val, 0.0);
1667         scratch[CDP_primary_val].u_val =
1668             (cum_val + cur_val * start_pdp_offset) /
1669             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1670         break;
1671     case CF_MAXIMUM: 
1672         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1673         cur_val = IFDNAN(pdp_temp_val, -DINF);
1674
1675 #if 0
1676 #ifdef DEBUG
1677         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1678             fprintf(stderr,
1679                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1680                     i, ii);
1681             exit(-1);
1682         }
1683 #endif
1684 #endif
1685         if (cur_val > cum_val)
1686             scratch[CDP_primary_val].u_val = cur_val;
1687         else
1688             scratch[CDP_primary_val].u_val = cum_val;
1689         break;
1690     case CF_MINIMUM:
1691         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1692         cur_val = IFDNAN(pdp_temp_val, DINF);
1693 #if 0
1694 #ifdef DEBUG
1695         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1696             fprintf(stderr,
1697                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1698                     ii);
1699             exit(-1);
1700         }
1701 #endif
1702 #endif
1703         if (cur_val < cum_val)
1704             scratch[CDP_primary_val].u_val = cur_val;
1705         else
1706             scratch[CDP_primary_val].u_val = cum_val;
1707         break;
1708     case CF_LAST:
1709     default:
1710         scratch[CDP_primary_val].u_val = pdp_temp_val;
1711         break;
1712     }
1713 }
1714
1715 /*
1716  * Update the consolidation function for Holt-Winters functions as
1717  * well as other functions that don't actually consolidate multiple
1718  * PDPs.
1719  */
1720 static void reset_cdp(
1721     rrd_t *rrd,
1722     unsigned long elapsed_pdp_st,
1723     rrd_value_t *pdp_temp,
1724     rrd_value_t *last_seasonal_coef,
1725     rrd_value_t *seasonal_coef,
1726     int rra_idx,
1727     int ds_idx,
1728     int cdp_idx,
1729     enum cf_en current_cf)
1730 {
1731     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1732
1733     switch (current_cf) {
1734     case CF_AVERAGE:
1735     default:
1736         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1737         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1738         break;
1739     case CF_SEASONAL:
1740     case CF_DEVSEASONAL:
1741         /* need to update cached seasonal values, so they are consistent
1742          * with the bulk update */
1743         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1744          * CDP_last_deviation are the same. */
1745         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1746         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1747         break;
1748     case CF_HWPREDICT:
1749     case CF_MHWPREDICT:
1750         /* need to update the null_count and last_null_count.
1751          * even do this for non-DNAN pdp_temp because the
1752          * algorithm is not learning from batch updates. */
1753         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1754         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1755         /* fall through */
1756     case CF_DEVPREDICT:
1757         scratch[CDP_primary_val].u_val = DNAN;
1758         scratch[CDP_secondary_val].u_val = DNAN;
1759         break;
1760     case CF_FAILURES:
1761         /* do not count missed bulk values as failures */
1762         scratch[CDP_primary_val].u_val = 0;
1763         scratch[CDP_secondary_val].u_val = 0;
1764         /* need to reset violations buffer.
1765          * could do this more carefully, but for now, just
1766          * assume a bulk update wipes away all violations. */
1767         erase_violations(rrd, cdp_idx, rra_idx);
1768         break;
1769     }
1770 }
1771
1772 static rrd_value_t initialize_carry_over(
1773     rrd_value_t pdp_temp_val,
1774     int current_cf,
1775     unsigned long elapsed_pdp_st,
1776     unsigned long start_pdp_offset,
1777     unsigned long pdp_cnt)
1778 {
1779     unsigned long pdp_into_cdp_cnt = ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1780     if ( pdp_into_cdp_cnt == 0 || isnan(pdp_temp_val)){
1781         switch (current_cf) {
1782         case CF_MAXIMUM:
1783             return -DINF;
1784         case CF_MINIMUM:
1785             return DINF;
1786         case CF_AVERAGE:
1787             return 0;
1788         default:
1789             return DNAN;
1790         }        
1791     } 
1792     else {
1793         switch (current_cf) {
1794         case CF_AVERAGE:
1795             return pdp_temp_val *  pdp_into_cdp_cnt ;
1796         default:
1797             return pdp_temp_val;
1798         }        
1799     }        
1800 }
1801
1802 /*
1803  * Update or initialize a CDP value based on the consolidation
1804  * function.
1805  *
1806  * Returns the new value.
1807  */
1808 static rrd_value_t calculate_cdp_val(
1809     rrd_value_t cdp_val,
1810     rrd_value_t pdp_temp_val,
1811     unsigned long elapsed_pdp_st,
1812     int current_cf,
1813 #ifdef DEBUG
1814     int i,
1815     int ii
1816 #else
1817     int UNUSED(i),
1818     int UNUSED(ii)
1819 #endif
1820     )
1821 {
1822     if (isnan(cdp_val)) {
1823         if (current_cf == CF_AVERAGE) {
1824             pdp_temp_val *= elapsed_pdp_st;
1825         }
1826 #ifdef DEBUG
1827         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1828                 i, ii, pdp_temp_val);
1829 #endif
1830         return pdp_temp_val;
1831     }
1832     if (current_cf == CF_AVERAGE)
1833         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1834     if (current_cf == CF_MINIMUM)
1835         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1836     if (current_cf == CF_MAXIMUM)
1837         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1838
1839     return pdp_temp_val;
1840 }
1841
1842 /*
1843  * For each RRA, update the seasonal values and then call update_aberrant_CF
1844  * for each data source.
1845  *
1846  * Return 0 on success, -1 on error.
1847  */
1848 static int update_aberrant_cdps(
1849     rrd_t *rrd,
1850     rrd_file_t *rrd_file,
1851     unsigned long rra_begin,
1852     unsigned long elapsed_pdp_st,
1853     rrd_value_t *pdp_temp,
1854     rrd_value_t **seasonal_coef)
1855 {
1856     unsigned long rra_idx, ds_idx, j;
1857
1858     /* number of PDP steps since the last update that
1859      * are assigned to the first CDP to be generated
1860      * since the last update. */
1861     unsigned short scratch_idx;
1862     unsigned long rra_start;
1863     enum cf_en current_cf;
1864
1865     /* this loop is only entered if elapsed_pdp_st < 3 */
1866     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1867          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1868         rra_start = rra_begin;
1869         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1870             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1871                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1872                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1873                     if (scratch_idx == CDP_primary_val) {
1874                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1875                                         elapsed_pdp_st + 1, seasonal_coef);
1876                     } else {
1877                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1878                                         elapsed_pdp_st + 2, seasonal_coef);
1879                     }
1880                 }
1881                 if (rrd_test_error())
1882                     return -1;
1883                 /* loop over data soures within each RRA */
1884                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1885                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1886                                        rra_idx * (rrd->stat_head->ds_cnt) +
1887                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1888                                        *seasonal_coef);
1889                 }
1890             }
1891             rra_start += rrd->rra_def[rra_idx].row_cnt
1892                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1893         }
1894     }
1895     return 0;
1896 }
1897
1898 /* 
1899  * Move sequentially through the file, writing one RRA at a time.  Note this
1900  * architecture divorces the computation of CDP with flushing updated RRA
1901  * entries to disk.
1902  *
1903  * Return 0 on success, -1 on error.
1904  */
1905 static int write_to_rras(
1906     rrd_t *rrd,
1907     rrd_file_t *rrd_file,
1908     unsigned long *rra_step_cnt,
1909     unsigned long rra_begin,
1910     time_t current_time,
1911     unsigned long *skip_update,
1912     rrd_info_t ** pcdp_summary)
1913 {
1914     unsigned long rra_idx;
1915     unsigned long rra_start;
1916     time_t    rra_time = 0; /* time of update for a RRA */
1917
1918     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1919     
1920     /* Ready to write to disk */
1921     rra_start = rra_begin;
1922
1923     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1924         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1925         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1926
1927         /* for cdp_prep */
1928         unsigned short scratch_idx;
1929         unsigned long step_subtract;
1930
1931         for (scratch_idx = CDP_primary_val,
1932                  step_subtract = 1;
1933              rra_step_cnt[rra_idx] > 0;
1934              rra_step_cnt[rra_idx]--,
1935                  scratch_idx = CDP_secondary_val,
1936                  step_subtract = 2) {
1937
1938             size_t rra_pos_new;
1939 #ifdef DEBUG
1940             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1941 #endif
1942             /* increment, with wrap-around */
1943             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1944               rra_ptr->cur_row = 0;
1945
1946             /* we know what our position should be */
1947             rra_pos_new = rra_start
1948               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1949
1950             /* re-seek if the position is wrong or we wrapped around */
1951             if ((size_t)rra_pos_new != rrd_file->pos) {
1952                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1953                     rrd_set_error("seek error in rrd");
1954                     return -1;
1955                 }
1956             }
1957 #ifdef DEBUG
1958             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1959 #endif
1960
1961             if (skip_update[rra_idx])
1962                 continue;
1963
1964             if (*pcdp_summary != NULL) {
1965                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1966
1967                 rra_time = (current_time - current_time % step_time)
1968                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1969             }
1970
1971             if (write_RRA_row
1972                 (rrd_file, rrd, rra_idx, scratch_idx,
1973                  pcdp_summary, rra_time) == -1)
1974                 return -1;
1975
1976             rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1977         }
1978
1979         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1980     } /* RRA LOOP */
1981
1982     return 0;
1983 }
1984
1985 /*
1986  * Write out one row of values (one value per DS) to the archive.
1987  *
1988  * Returns 0 on success, -1 on error.
1989  */
1990 static int write_RRA_row(
1991     rrd_file_t *rrd_file,
1992     rrd_t *rrd,
1993     unsigned long rra_idx,
1994     unsigned short CDP_scratch_idx,
1995     rrd_info_t ** pcdp_summary,
1996     time_t rra_time)
1997 {
1998     unsigned long ds_idx, cdp_idx;
1999     rrd_infoval_t iv;
2000
2001     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2002         /* compute the cdp index */
2003         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2004 #ifdef DEBUG
2005         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2006                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2007                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2008 #endif
2009         if (*pcdp_summary != NULL) {
2010             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2011             /* append info to the return hash */
2012             *pcdp_summary = rrd_info_push(*pcdp_summary,
2013                                           sprintf_alloc
2014                                           ("[%lli]RRA[%s][%lu]DS[%s]", 
2015                                            (long long)rra_time,
2016                                            rrd->rra_def[rra_idx].cf_nam,
2017                                            rrd->rra_def[rra_idx].pdp_cnt,
2018                                            rrd->ds_def[ds_idx].ds_nam),
2019                                            RD_I_VAL, iv);
2020         }
2021         errno = 0;
2022         if (rrd_write(rrd_file,
2023                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2024                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2025             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2026             return -1;
2027         }
2028     }
2029     return 0;
2030 }
2031
2032 /*
2033  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2034  *
2035  * Returns 0 on success, -1 otherwise
2036  */
2037 static int smooth_all_rras(
2038     rrd_t *rrd,
2039     rrd_file_t *rrd_file,
2040     unsigned long rra_begin)
2041 {
2042     unsigned long rra_start = rra_begin;
2043     unsigned long rra_idx;
2044
2045     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2046         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2047             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2048 #ifdef DEBUG
2049             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2050 #endif
2051             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2052             if (rrd_test_error())
2053                 return -1;
2054         }
2055         rra_start += rrd->rra_def[rra_idx].row_cnt
2056             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2057     }
2058     return 0;
2059 }
2060
2061 #ifndef HAVE_MMAP
2062 /*
2063  * Flush changes to disk (unless we're using mmap)
2064  *
2065  * Returns 0 on success, -1 otherwise
2066  */
2067 static int write_changes_to_disk(
2068     rrd_t *rrd,
2069     rrd_file_t *rrd_file,
2070     int version)
2071 {
2072     /* we just need to write back the live header portion now */
2073     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2074                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2075                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2076                  SEEK_SET) != 0) {
2077         rrd_set_error("seek rrd for live header writeback");
2078         return -1;
2079     }
2080     if (version >= 3) {
2081         if (rrd_write(rrd_file, rrd->live_head,
2082                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2083             rrd_set_error("rrd_write live_head to rrd");
2084             return -1;
2085         }
2086     } else {
2087         if (rrd_write(rrd_file, rrd->legacy_last_up,
2088                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2089             rrd_set_error("rrd_write live_head to rrd");
2090             return -1;
2091         }
2092     }
2093
2094
2095     if (rrd_write(rrd_file, rrd->pdp_prep,
2096                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2097         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2098         rrd_set_error("rrd_write pdp_prep to rrd");
2099         return -1;
2100     }
2101
2102     if (rrd_write(rrd_file, rrd->cdp_prep,
2103                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2104                   rrd->stat_head->ds_cnt)
2105         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2106                       rrd->stat_head->ds_cnt)) {
2107
2108         rrd_set_error("rrd_write cdp_prep to rrd");
2109         return -1;
2110     }
2111
2112     if (rrd_write(rrd_file, rrd->rra_ptr,
2113                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2114         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2115         rrd_set_error("rrd_write rra_ptr to rrd");
2116         return -1;
2117     }
2118     return 0;
2119 }
2120 #endif