* patches to make rrdtool compile on win32 (trunk and 1.4)
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.4.3  Copyright by Tobi Oetiker, 1997-2010
3  *                Copyright by Florian Forster, 2008
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
9
10 #include "rrd_tool.h"
11
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
17
18 #include <locale.h>
19
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
22
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
25
26 #include "rrd_client.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37     time_t    tv_sec;   /* seconds */
38     long      tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43     int       tz_minuteswest;   /* minutes W of Greenwich */
44     int       tz_dsttime;   /* type of dst correction */
45 };
46
47 static int gettimeofday(
48     struct timeval *t,
49     struct __timezone *tz)
50 {
51
52     struct _timeb current_time;
53
54     _ftime(&current_time);
55
56     t->tv_sec = current_time.time;
57     t->tv_usec = current_time.millitm * 1000;
58
59     return 0;
60 }
61
62 #endif
63
64 /* FUNCTION PROTOTYPES */
65
66 int       rrd_update_r(
67     const char *filename,
68     const char *tmplt,
69     int argc,
70     const char **argv);
71 int       _rrd_update(
72     const char *filename,
73     const char *tmplt,
74     int argc,
75     const char **argv,
76     rrd_info_t *);
77
78 static int allocate_data_structures(
79     rrd_t *rrd,
80     char ***updvals,
81     rrd_value_t **pdp_temp,
82     const char *tmplt,
83     long **tmpl_idx,
84     unsigned long *tmpl_cnt,
85     unsigned long **rra_step_cnt,
86     unsigned long **skip_update,
87     rrd_value_t **pdp_new);
88
89 static int parse_template(
90     rrd_t *rrd,
91     const char *tmplt,
92     unsigned long *tmpl_cnt,
93     long *tmpl_idx);
94
95 static int process_arg(
96     char *step_start,
97     rrd_t *rrd,
98     rrd_file_t *rrd_file,
99     unsigned long rra_begin,
100     time_t *current_time,
101     unsigned long *current_time_usec,
102     rrd_value_t *pdp_temp,
103     rrd_value_t *pdp_new,
104     unsigned long *rra_step_cnt,
105     char **updvals,
106     long *tmpl_idx,
107     unsigned long tmpl_cnt,
108     rrd_info_t ** pcdp_summary,
109     int version,
110     unsigned long *skip_update,
111     int *schedule_smooth);
112
113 static int parse_ds(
114     rrd_t *rrd,
115     char **updvals,
116     long *tmpl_idx,
117     char *input,
118     unsigned long tmpl_cnt,
119     time_t *current_time,
120     unsigned long *current_time_usec,
121     int version);
122
123 static int get_time_from_reading(
124     rrd_t *rrd,
125     char timesyntax,
126     char **updvals,
127     time_t *current_time,
128     unsigned long *current_time_usec,
129     int version);
130
131 static int update_pdp_prep(
132     rrd_t *rrd,
133     char **updvals,
134     rrd_value_t *pdp_new,
135     double interval);
136
137 static int calculate_elapsed_steps(
138     rrd_t *rrd,
139     unsigned long current_time,
140     unsigned long current_time_usec,
141     double interval,
142     double *pre_int,
143     double *post_int,
144     unsigned long *proc_pdp_cnt);
145
146 static void simple_update(
147     rrd_t *rrd,
148     double interval,
149     rrd_value_t *pdp_new);
150
151 static int process_all_pdp_st(
152     rrd_t *rrd,
153     double interval,
154     double pre_int,
155     double post_int,
156     unsigned long elapsed_pdp_st,
157     rrd_value_t *pdp_new,
158     rrd_value_t *pdp_temp);
159
160 static int process_pdp_st(
161     rrd_t *rrd,
162     unsigned long ds_idx,
163     double interval,
164     double pre_int,
165     double post_int,
166     long diff_pdp_st,
167     rrd_value_t *pdp_new,
168     rrd_value_t *pdp_temp);
169
170 static int update_all_cdp_prep(
171     rrd_t *rrd,
172     unsigned long *rra_step_cnt,
173     unsigned long rra_begin,
174     rrd_file_t *rrd_file,
175     unsigned long elapsed_pdp_st,
176     unsigned long proc_pdp_cnt,
177     rrd_value_t **last_seasonal_coef,
178     rrd_value_t **seasonal_coef,
179     rrd_value_t *pdp_temp,
180     unsigned long *skip_update,
181     int *schedule_smooth);
182
183 static int do_schedule_smooth(
184     rrd_t *rrd,
185     unsigned long rra_idx,
186     unsigned long elapsed_pdp_st);
187
188 static int update_cdp_prep(
189     rrd_t *rrd,
190     unsigned long elapsed_pdp_st,
191     unsigned long start_pdp_offset,
192     unsigned long *rra_step_cnt,
193     int rra_idx,
194     rrd_value_t *pdp_temp,
195     rrd_value_t *last_seasonal_coef,
196     rrd_value_t *seasonal_coef,
197     int current_cf);
198
199 static void update_cdp(
200     unival *scratch,
201     int current_cf,
202     rrd_value_t pdp_temp_val,
203     unsigned long rra_step_cnt,
204     unsigned long elapsed_pdp_st,
205     unsigned long start_pdp_offset,
206     unsigned long pdp_cnt,
207     rrd_value_t xff,
208     int i,
209     int ii);
210
211 static void initialize_cdp_val(
212     unival *scratch,
213     int current_cf,
214     rrd_value_t pdp_temp_val,
215     unsigned long start_pdp_offset,
216     unsigned long pdp_cnt);
217
218 static void reset_cdp(
219     rrd_t *rrd,
220     unsigned long elapsed_pdp_st,
221     rrd_value_t *pdp_temp,
222     rrd_value_t *last_seasonal_coef,
223     rrd_value_t *seasonal_coef,
224     int rra_idx,
225     int ds_idx,
226     int cdp_idx,
227     enum cf_en current_cf);
228
229 static rrd_value_t initialize_carry_over(
230     rrd_value_t pdp_temp_val,
231     int         current_cf,
232     unsigned long elapsed_pdp_st,
233     unsigned long start_pdp_offset,
234     unsigned long pdp_cnt);
235
236 static rrd_value_t calculate_cdp_val(
237     rrd_value_t cdp_val,
238     rrd_value_t pdp_temp_val,
239     unsigned long elapsed_pdp_st,
240     int current_cf,
241     int i,
242     int ii);
243
244 static int update_aberrant_cdps(
245     rrd_t *rrd,
246     rrd_file_t *rrd_file,
247     unsigned long rra_begin,
248     unsigned long elapsed_pdp_st,
249     rrd_value_t *pdp_temp,
250     rrd_value_t **seasonal_coef);
251
252 static int write_to_rras(
253     rrd_t *rrd,
254     rrd_file_t *rrd_file,
255     unsigned long *rra_step_cnt,
256     unsigned long rra_begin,
257     time_t current_time,
258     unsigned long *skip_update,
259     rrd_info_t ** pcdp_summary);
260
261 static int write_RRA_row(
262     rrd_file_t *rrd_file,
263     rrd_t *rrd,
264     unsigned long rra_idx,
265     unsigned short CDP_scratch_idx,
266     rrd_info_t ** pcdp_summary,
267     time_t rra_time);
268
269 static int smooth_all_rras(
270     rrd_t *rrd,
271     rrd_file_t *rrd_file,
272     unsigned long rra_begin);
273
274 #ifndef HAVE_MMAP
275 static int write_changes_to_disk(
276     rrd_t *rrd,
277     rrd_file_t *rrd_file,
278     int version);
279 #endif
280
281 /*
282  * normalize time as returned by gettimeofday. usec part must
283  * be always >= 0
284  */
285 static void normalize_time(
286     struct timeval *t)
287 {
288     if (t->tv_usec < 0) {
289         t->tv_sec--;
290         t->tv_usec += 1e6L;
291     }
292 }
293
294 /*
295  * Sets current_time and current_time_usec based on the current time.
296  * current_time_usec is set to 0 if the version number is 1 or 2.
297  */
298 static void initialize_time(
299     time_t *current_time,
300     unsigned long *current_time_usec,
301     int version)
302 {
303     struct timeval tmp_time;    /* used for time conversion */
304
305     gettimeofday(&tmp_time, 0);
306     normalize_time(&tmp_time);
307     *current_time = tmp_time.tv_sec;
308     if (version >= 3) {
309         *current_time_usec = tmp_time.tv_usec;
310     } else {
311         *current_time_usec = 0;
312     }
313 }
314
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
316
317 rrd_info_t *rrd_update_v(
318     int argc,
319     char **argv)
320 {
321     char     *tmplt = NULL;
322     rrd_info_t *result = NULL;
323     rrd_infoval_t rc;
324     char *opt_daemon = NULL;
325     struct option long_options[] = {
326         {"template", required_argument, 0, 't'},
327         {0, 0, 0, 0}
328     };
329
330     rc.u_int = -1;
331     optind = 0;
332     opterr = 0;         /* initialize getopt */
333
334     while (1) {
335         int       option_index = 0;
336         int       opt;
337
338         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
339
340         if (opt == EOF)
341             break;
342
343         switch (opt) {
344         case 't':
345             tmplt = optarg;
346             break;
347
348         case '?':
349             rrd_set_error("unknown option '%s'", argv[optind - 1]);
350             goto end_tag;
351         }
352     }
353
354     opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355     if (opt_daemon != NULL) {
356         rrd_set_error ("The \"%s\" environment variable is defined, "
357                 "but \"%s\" cannot work with rrdcached. Either unset "
358                 "the environment variable or use \"update\" instead.",
359                 ENV_RRDCACHED_ADDRESS, argv[0]);
360         goto end_tag;
361     }
362
363     /* need at least 2 arguments: filename, data. */
364     if (argc - optind < 2) {
365         rrd_set_error("Not enough arguments");
366         goto end_tag;
367     }
368     rc.u_int = 0;
369     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370     rc.u_int = _rrd_update(argv[optind], tmplt,
371                            argc - optind - 1,
372                            (const char **) (argv + optind + 1), result);
373     result->value.u_int = rc.u_int;
374   end_tag:
375     return result;
376 }
377
378 int rrd_update(
379     int argc,
380     char **argv)
381 {
382     struct option long_options[] = {
383         {"template", required_argument, 0, 't'},
384         {"daemon",   required_argument, 0, 'd'},
385         {0, 0, 0, 0}
386     };
387     int       option_index = 0;
388     int       opt;
389     char     *tmplt = NULL;
390     int       rc = -1;
391     char     *opt_daemon = NULL;
392
393     optind = 0;
394     opterr = 0;         /* initialize getopt */
395
396     while (1) {
397         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
398
399         if (opt == EOF)
400             break;
401
402         switch (opt) {
403         case 't':
404             tmplt = strdup(optarg);
405             break;
406
407         case 'd':
408             if (opt_daemon != NULL)
409                 free (opt_daemon);
410             opt_daemon = strdup (optarg);
411             if (opt_daemon == NULL)
412             {
413                 rrd_set_error("strdup failed.");
414                 goto out;
415             }
416             break;
417
418         case '?':
419             rrd_set_error("unknown option '%s'", argv[optind - 1]);
420             goto out;
421         }
422     }
423
424     /* need at least 2 arguments: filename, data. */
425     if (argc - optind < 2) {
426         rrd_set_error("Not enough arguments");
427         goto out;
428     }
429
430     {   /* try to connect to rrdcached */
431         int status = rrdc_connect(opt_daemon);
432         if (status != 0) return status;
433     }
434
435     if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
436     {
437         rrd_set_error("The caching daemon cannot be used together with "
438                 "templates yet.");
439         goto out;
440     }
441
442     if (! rrdc_is_connected(opt_daemon))
443     {
444       rc = rrd_update_r(argv[optind], tmplt,
445                         argc - optind - 1, (const char **) (argv + optind + 1));
446     }
447     else /* we are connected */
448     {
449         rc = rrdc_update (argv[optind], /* file */
450                           argc - optind - 1, /* values_num */
451                           (const char *const *) (argv + optind + 1)); /* values */
452         if (rc > 0)
453             rrd_set_error("Failed sending the values to rrdcached: %s",
454                           rrd_strerror (rc));
455     }
456
457   out:
458     if (tmplt != NULL)
459     {
460         free(tmplt);
461         tmplt = NULL;
462     }
463     if (opt_daemon != NULL)
464     {
465         free (opt_daemon);
466         opt_daemon = NULL;
467     }
468     return rc;
469 }
470
471 int rrd_update_r(
472     const char *filename,
473     const char *tmplt,
474     int argc,
475     const char **argv)
476 {
477     return _rrd_update(filename, tmplt, argc, argv, NULL);
478 }
479
480 int rrd_update_v_r(
481     const char *filename,
482     const char *tmplt,
483     int argc,
484     const char **argv,
485     rrd_info_t * pcdp_summary)
486 {
487     return _rrd_update(filename, tmplt, argc, argv, pcdp_summary);
488 }
489
490 int _rrd_update(
491     const char *filename,
492     const char *tmplt,
493     int argc,
494     const char **argv,
495     rrd_info_t * pcdp_summary)
496 {
497
498     int       arg_i = 2;
499
500     unsigned long rra_begin;    /* byte pointer to the rra
501                                  * area in the rrd file.  this
502                                  * pointer never changes value */
503     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
504                              * to the existing entry */
505     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
506                              * to the cdp values */
507
508     long     *tmpl_idx; /* index representing the settings
509                          * transported by the tmplt index */
510     unsigned long tmpl_cnt = 2; /* time and data */
511     rrd_t     rrd;
512     time_t    current_time = 0;
513     unsigned long current_time_usec = 0;    /* microseconds part of current time */
514     char    **updvals;
515     int       schedule_smooth = 0;
516
517     /* number of elapsed PDP steps since last update */
518     unsigned long *rra_step_cnt = NULL;
519
520     int       version;  /* rrd version */
521     rrd_file_t *rrd_file;
522     char     *arg_copy; /* for processing the argv */
523     unsigned long *skip_update; /* RRAs to advance but not write */
524
525     /* need at least 1 arguments: data. */
526     if (argc < 1) {
527         rrd_set_error("Not enough arguments");
528         goto err_out;
529     }
530
531     rrd_init(&rrd);
532     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
533         goto err_free;
534     }
535     /* We are now at the beginning of the rra's */
536     rra_begin = rrd_file->header_len;
537
538     version = atoi(rrd.stat_head->version);
539
540     initialize_time(&current_time, &current_time_usec, version);
541
542     /* get exclusive lock to whole file.
543      * lock gets removed when we close the file.
544      */
545     if (rrd_lock(rrd_file) != 0) {
546         rrd_set_error("could not lock RRD");
547         goto err_close;
548     }
549
550     if (allocate_data_structures(&rrd, &updvals,
551                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
552                                  &rra_step_cnt, &skip_update,
553                                  &pdp_new) == -1) {
554         goto err_close;
555     }
556
557     /* loop through the arguments. */
558     for (arg_i = 0; arg_i < argc; arg_i++) {
559         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
560             rrd_set_error("failed duplication argv entry");
561             break;
562         }
563         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
564                         &current_time, &current_time_usec, pdp_temp, pdp_new,
565                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
566                         &pcdp_summary, version, skip_update,
567                         &schedule_smooth) == -1) {
568             if (rrd_test_error()) { /* Should have error string always here */
569                 char     *save_error;
570
571                 /* Prepend file name to error message */
572                 if ((save_error = strdup(rrd_get_error())) != NULL) {
573                     rrd_set_error("%s: %s", filename, save_error);
574                     free(save_error);
575                 }
576             }
577             free(arg_copy);
578             break;
579         }
580         free(arg_copy);
581     }
582
583     free(rra_step_cnt);
584
585     /* if we got here and if there is an error and if the file has not been
586      * written to, then close things up and return. */
587     if (rrd_test_error()) {
588         goto err_free_structures;
589     }
590 #ifndef HAVE_MMAP
591     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
592         goto err_free_structures;
593     }
594 #endif
595
596     /* calling the smoothing code here guarantees at most one smoothing
597      * operation per rrd_update call. Unfortunately, it is possible with bulk
598      * updates, or a long-delayed update for smoothing to occur off-schedule.
599      * This really isn't critical except during the burn-in cycles. */
600     if (schedule_smooth) {
601         smooth_all_rras(&rrd, rrd_file, rra_begin);
602     }
603
604 /*    rrd_dontneed(rrd_file,&rrd); */
605     rrd_free(&rrd);
606     rrd_close(rrd_file);
607
608     free(pdp_new);
609     free(tmpl_idx);
610     free(pdp_temp);
611     free(skip_update);
612     free(updvals);
613     return 0;
614
615   err_free_structures:
616     free(pdp_new);
617     free(tmpl_idx);
618     free(pdp_temp);
619     free(skip_update);
620     free(updvals);
621   err_close:
622     rrd_close(rrd_file);
623   err_free:
624     rrd_free(&rrd);
625   err_out:
626     return -1;
627 }
628
629 /*
630  * Allocate some important arrays used, and initialize the template.
631  *
632  * When it returns, either all of the structures are allocated
633  * or none of them are.
634  *
635  * Returns 0 on success, -1 on error.
636  */
637 static int allocate_data_structures(
638     rrd_t *rrd,
639     char ***updvals,
640     rrd_value_t **pdp_temp,
641     const char *tmplt,
642     long **tmpl_idx,
643     unsigned long *tmpl_cnt,
644     unsigned long **rra_step_cnt,
645     unsigned long **skip_update,
646     rrd_value_t **pdp_new)
647 {
648     unsigned  i, ii;
649     if ((*updvals = (char **) malloc(sizeof(char *)
650                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
651         rrd_set_error("allocating updvals pointer array.");
652         return -1;
653     }
654     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
655                                             * rrd->stat_head->ds_cnt)) ==
656         NULL) {
657         rrd_set_error("allocating pdp_temp.");
658         goto err_free_updvals;
659     }
660     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
661                                                  *
662                                                  rrd->stat_head->rra_cnt)) ==
663         NULL) {
664         rrd_set_error("allocating skip_update.");
665         goto err_free_pdp_temp;
666     }
667     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
668                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
669         rrd_set_error("allocating tmpl_idx.");
670         goto err_free_skip_update;
671     }
672     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
673                                                   *
674                                                   (rrd->stat_head->
675                                                    rra_cnt))) == NULL) {
676         rrd_set_error("allocating rra_step_cnt.");
677         goto err_free_tmpl_idx;
678     }
679
680     /* initialize tmplt redirector */
681     /* default config example (assume DS 1 is a CDEF DS)
682        tmpl_idx[0] -> 0; (time)
683        tmpl_idx[1] -> 1; (DS 0)
684        tmpl_idx[2] -> 3; (DS 2)
685        tmpl_idx[3] -> 4; (DS 3) */
686     (*tmpl_idx)[0] = 0; /* time */
687     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
688         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
689             (*tmpl_idx)[ii++] = i;
690     }
691     *tmpl_cnt = ii;
692
693     if (tmplt != NULL) {
694         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
695             goto err_free_rra_step_cnt;
696         }
697     }
698
699     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
700                                            * rrd->stat_head->ds_cnt)) == NULL) {
701         rrd_set_error("allocating pdp_new.");
702         goto err_free_rra_step_cnt;
703     }
704
705     return 0;
706
707   err_free_rra_step_cnt:
708     free(*rra_step_cnt);
709   err_free_tmpl_idx:
710     free(*tmpl_idx);
711   err_free_skip_update:
712     free(*skip_update);
713   err_free_pdp_temp:
714     free(*pdp_temp);
715   err_free_updvals:
716     free(*updvals);
717     return -1;
718 }
719
720 /*
721  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
722  *
723  * Returns 0 on success.
724  */
725 static int parse_template(
726     rrd_t *rrd,
727     const char *tmplt,
728     unsigned long *tmpl_cnt,
729     long *tmpl_idx)
730 {
731     char     *dsname, *tmplt_copy;
732     unsigned int tmpl_len, i;
733     int       ret = 0;
734
735     *tmpl_cnt = 1;      /* the first entry is the time */
736
737     /* we should work on a writeable copy here */
738     if ((tmplt_copy = strdup(tmplt)) == NULL) {
739         rrd_set_error("error copying tmplt '%s'", tmplt);
740         ret = -1;
741         goto out;
742     }
743
744     dsname = tmplt_copy;
745     tmpl_len = strlen(tmplt_copy);
746     for (i = 0; i <= tmpl_len; i++) {
747         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
748             tmplt_copy[i] = '\0';
749             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
750                 rrd_set_error("tmplt contains more DS definitions than RRD");
751                 ret = -1;
752                 goto out_free_tmpl_copy;
753             }
754             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
755                 rrd_set_error("unknown DS name '%s'", dsname);
756                 ret = -1;
757                 goto out_free_tmpl_copy;
758             }
759             /* go to the next entry on the tmplt_copy */
760             if (i < tmpl_len)
761                 dsname = &tmplt_copy[i + 1];
762         }
763     }
764   out_free_tmpl_copy:
765     free(tmplt_copy);
766   out:
767     return ret;
768 }
769
770 /*
771  * Parse an update string, updates the primary data points (PDPs)
772  * and consolidated data points (CDPs), and writes changes to the RRAs.
773  *
774  * Returns 0 on success, -1 on error.
775  */
776 static int process_arg(
777     char *step_start,
778     rrd_t *rrd,
779     rrd_file_t *rrd_file,
780     unsigned long rra_begin,
781     time_t *current_time,
782     unsigned long *current_time_usec,
783     rrd_value_t *pdp_temp,
784     rrd_value_t *pdp_new,
785     unsigned long *rra_step_cnt,
786     char **updvals,
787     long *tmpl_idx,
788     unsigned long tmpl_cnt,
789     rrd_info_t ** pcdp_summary,
790     int version,
791     unsigned long *skip_update,
792     int *schedule_smooth)
793 {
794     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
795
796     /* a vector of future Holt-Winters seasonal coefs */
797     unsigned long elapsed_pdp_st;
798
799     double    interval, pre_int, post_int;  /* interval between this and
800                                              * the last run */
801     unsigned long proc_pdp_cnt;
802
803     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
804                  current_time, current_time_usec, version) == -1) {
805         return -1;
806     }
807
808     interval = (double) (*current_time - rrd->live_head->last_up)
809         + (double) ((long) *current_time_usec -
810                     (long) rrd->live_head->last_up_usec) / 1e6f;
811
812     /* process the data sources and update the pdp_prep 
813      * area accordingly */
814     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
815         return -1;
816     }
817
818     elapsed_pdp_st = calculate_elapsed_steps(rrd,
819                                              *current_time,
820                                              *current_time_usec, interval,
821                                              &pre_int, &post_int,
822                                              &proc_pdp_cnt);
823
824     /* has a pdp_st moment occurred since the last run ? */
825     if (elapsed_pdp_st == 0) {
826         /* no we have not passed a pdp_st moment. therefore update is simple */
827         simple_update(rrd, interval, pdp_new);
828     } else {
829         /* an pdp_st has occurred. */
830         if (process_all_pdp_st(rrd, interval,
831                                pre_int, post_int,
832                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
833             return -1;
834         }
835         if (update_all_cdp_prep(rrd, rra_step_cnt,
836                                 rra_begin, rrd_file,
837                                 elapsed_pdp_st,
838                                 proc_pdp_cnt,
839                                 &last_seasonal_coef,
840                                 &seasonal_coef,
841                                 pdp_temp,
842                                 skip_update, schedule_smooth) == -1) {
843             goto err_free_coefficients;
844         }
845         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
846                                  elapsed_pdp_st, pdp_temp,
847                                  &seasonal_coef) == -1) {
848             goto err_free_coefficients;
849         }
850         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
851                           *current_time, skip_update,
852                           pcdp_summary) == -1) {
853             goto err_free_coefficients;
854         }
855     }                   /* endif a pdp_st has occurred */
856     rrd->live_head->last_up = *current_time;
857     rrd->live_head->last_up_usec = *current_time_usec;
858
859     if (version < 3) {
860         *rrd->legacy_last_up = rrd->live_head->last_up;
861     }
862     free(seasonal_coef);
863     free(last_seasonal_coef);
864     return 0;
865
866   err_free_coefficients:
867     free(seasonal_coef);
868     free(last_seasonal_coef);
869     return -1;
870 }
871
872 /*
873  * Parse a DS string (time + colon-separated values), storing the
874  * results in current_time, current_time_usec, and updvals.
875  *
876  * Returns 0 on success, -1 on error.
877  */
878 static int parse_ds(
879     rrd_t *rrd,
880     char **updvals,
881     long *tmpl_idx,
882     char *input,
883     unsigned long tmpl_cnt,
884     time_t *current_time,
885     unsigned long *current_time_usec,
886     int version)
887 {
888     char     *p;
889     unsigned long i;
890     char      timesyntax;
891
892     updvals[0] = input;
893     /* initialize all ds input to unknown except the first one
894        which has always got to be set */
895     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
896         updvals[i] = "U";
897
898     /* separate all ds elements; first must be examined separately
899        due to alternate time syntax */
900     if ((p = strchr(input, '@')) != NULL) {
901         timesyntax = '@';
902     } else if ((p = strchr(input, ':')) != NULL) {
903         timesyntax = ':';
904     } else {
905         rrd_set_error("expected timestamp not found in data source from %s",
906                       input);
907         return -1;
908     }
909     *p = '\0';
910     i = 1;
911     updvals[tmpl_idx[i++]] = p + 1;
912     while (*(++p)) {
913         if (*p == ':') {
914             *p = '\0';
915             if (i < tmpl_cnt) {
916                 updvals[tmpl_idx[i++]] = p + 1;
917             }
918             else {
919                 rrd_set_error("found extra data on update argument: %s",p+1);
920                 return -1;
921             }                
922         }
923     }
924
925     if (i != tmpl_cnt) {
926         rrd_set_error("expected %lu data source readings (got %lu) from %s",
927                       tmpl_cnt - 1, i - 1, input);
928         return -1;
929     }
930
931     if (get_time_from_reading(rrd, timesyntax, updvals,
932                               current_time, current_time_usec,
933                               version) == -1) {
934         return -1;
935     }
936     return 0;
937 }
938
939 /*
940  * Parse the time in a DS string, store it in current_time and 
941  * current_time_usec and verify that it's later than the last
942  * update for this DS.
943  *
944  * Returns 0 on success, -1 on error.
945  */
946 static int get_time_from_reading(
947     rrd_t *rrd,
948     char timesyntax,
949     char **updvals,
950     time_t *current_time,
951     unsigned long *current_time_usec,
952     int version)
953 {
954     double    tmp;
955     char     *parsetime_error = NULL;
956     char     *old_locale;
957     rrd_time_value_t ds_tv;
958     struct timeval tmp_time;    /* used for time conversion */
959
960     /* get the time from the reading ... handle N */
961     if (timesyntax == '@') {    /* at-style */
962         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
963             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
964             return -1;
965         }
966         if (ds_tv.type == RELATIVE_TO_END_TIME ||
967             ds_tv.type == RELATIVE_TO_START_TIME) {
968             rrd_set_error("specifying time relative to the 'start' "
969                           "or 'end' makes no sense here: %s", updvals[0]);
970             return -1;
971         }
972         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
973         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
974     } else if (strcmp(updvals[0], "N") == 0) {
975         gettimeofday(&tmp_time, 0);
976         normalize_time(&tmp_time);
977         *current_time = tmp_time.tv_sec;
978         *current_time_usec = tmp_time.tv_usec;
979     } else {
980         old_locale = setlocale(LC_NUMERIC, "C");
981         errno = 0;
982         tmp = strtod(updvals[0], 0);
983         if (errno > 0) {
984             rrd_set_error("converting '%s' to float: %s",
985                 updvals[0], rrd_strerror(errno));
986             return -1;
987         };
988         setlocale(LC_NUMERIC, old_locale);
989         if (tmp < 0.0){
990             gettimeofday(&tmp_time, 0);
991             tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
992         }
993
994         *current_time = floor(tmp);
995         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
996     }
997     /* dont do any correction for old version RRDs */
998     if (version < 3)
999         *current_time_usec = 0;
1000
1001     if (*current_time < rrd->live_head->last_up ||
1002         (*current_time == rrd->live_head->last_up &&
1003          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1004         rrd_set_error("illegal attempt to update using time %ld when "
1005                       "last update time is %ld (minimum one second step)",
1006                       *current_time, rrd->live_head->last_up);
1007         return -1;
1008     }
1009     return 0;
1010 }
1011
1012 /*
1013  * Update pdp_new by interpreting the updvals according to the DS type
1014  * (COUNTER, GAUGE, etc.).
1015  *
1016  * Returns 0 on success, -1 on error.
1017  */
1018 static int update_pdp_prep(
1019     rrd_t *rrd,
1020     char **updvals,
1021     rrd_value_t *pdp_new,
1022     double interval)
1023 {
1024     unsigned long ds_idx;
1025     int       ii;
1026     char     *endptr;   /* used in the conversion */
1027     double    rate;
1028     char     *old_locale;
1029     enum dst_en dst_idx;
1030
1031     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1032         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1033
1034         /* make sure we do not build diffs with old last_ds values */
1035         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1036             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1037             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1038         }
1039
1040         /* NOTE: DST_CDEF should never enter this if block, because
1041          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1042          * accidently specified a value for the DST_CDEF. To handle this case,
1043          * an extra check is required. */
1044
1045         if ((updvals[ds_idx + 1][0] != 'U') &&
1046             (dst_idx != DST_CDEF) &&
1047             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1048             rate = DNAN;
1049
1050             /* pdp_new contains rate * time ... eg the bytes transferred during
1051              * the interval. Doing it this way saves a lot of math operations
1052              */
1053             switch (dst_idx) {
1054             case DST_COUNTER:
1055             case DST_DERIVE:
1056                 /* Check if this is a valid integer. `U' is already handled in
1057                  * another branch. */
1058                 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1059                     if ((ii == 0) && (dst_idx == DST_DERIVE)
1060                             && (updvals[ds_idx + 1][ii] == '-'))
1061                         continue;
1062
1063                     if ((updvals[ds_idx + 1][ii] < '0')
1064                             || (updvals[ds_idx + 1][ii] > '9')) {
1065                         rrd_set_error("not a simple %s integer: '%s'",
1066                                 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1067                                 updvals[ds_idx + 1]);
1068                         return -1;
1069                     }
1070                 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1071
1072                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1073                     pdp_new[ds_idx] =
1074                         rrd_diff(updvals[ds_idx + 1],
1075                                  rrd->pdp_prep[ds_idx].last_ds);
1076                     if (dst_idx == DST_COUNTER) {
1077                         /* simple overflow catcher. This will fail
1078                          * terribly for non 32 or 64 bit counters
1079                          * ... are there any others in SNMP land?
1080                          */
1081                         if (pdp_new[ds_idx] < (double) 0.0)
1082                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1083                         if (pdp_new[ds_idx] < (double) 0.0)
1084                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1085                     }
1086                     rate = pdp_new[ds_idx] / interval;
1087                 } else {
1088                     pdp_new[ds_idx] = DNAN;
1089                 }
1090                 break;
1091             case DST_ABSOLUTE:
1092                 old_locale = setlocale(LC_NUMERIC, "C");
1093                 errno = 0;
1094                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1095                 if (errno > 0) {
1096                     rrd_set_error("converting '%s' to float: %s",
1097                                   updvals[ds_idx + 1], rrd_strerror(errno));
1098                     return -1;
1099                 };
1100                 setlocale(LC_NUMERIC, old_locale);
1101                 if (endptr[0] != '\0') {
1102                     rrd_set_error
1103                         ("conversion of '%s' to float not complete: tail '%s'",
1104                          updvals[ds_idx + 1], endptr);
1105                     return -1;
1106                 }
1107                 rate = pdp_new[ds_idx] / interval;
1108                 break;
1109             case DST_GAUGE:
1110                 old_locale = setlocale(LC_NUMERIC, "C");
1111                 errno = 0;
1112                 pdp_new[ds_idx] =
1113                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1114                 if (errno) {
1115                     rrd_set_error("converting '%s' to float: %s",
1116                                   updvals[ds_idx + 1], rrd_strerror(errno));
1117                     return -1;
1118                 };
1119                 setlocale(LC_NUMERIC, old_locale);
1120                 if (endptr[0] != '\0') {
1121                     rrd_set_error
1122                         ("conversion of '%s' to float not complete: tail '%s'",
1123                          updvals[ds_idx + 1], endptr);
1124                     return -1;
1125                 }
1126                 rate = pdp_new[ds_idx] / interval;
1127                 break;
1128             default:
1129                 rrd_set_error("rrd contains unknown DS type : '%s'",
1130                               rrd->ds_def[ds_idx].dst);
1131                 return -1;
1132             }
1133             /* break out of this for loop if the error string is set */
1134             if (rrd_test_error()) {
1135                 return -1;
1136             }
1137             /* make sure pdp_temp is neither too large or too small
1138              * if any of these occur it becomes unknown ...
1139              * sorry folks ... */
1140             if (!isnan(rate) &&
1141                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1142                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1143                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1144                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1145                 pdp_new[ds_idx] = DNAN;
1146             }
1147         } else {
1148             /* no news is news all the same */
1149             pdp_new[ds_idx] = DNAN;
1150         }
1151
1152
1153         /* make a copy of the command line argument for the next run */
1154 #ifdef DEBUG
1155         fprintf(stderr, "prep ds[%lu]\t"
1156                 "last_arg '%s'\t"
1157                 "this_arg '%s'\t"
1158                 "pdp_new %10.2f\n",
1159                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1160                 pdp_new[ds_idx]);
1161 #endif
1162         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1163                 LAST_DS_LEN - 1);
1164         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1165     }
1166     return 0;
1167 }
1168
1169 /*
1170  * How many PDP steps have elapsed since the last update? Returns the answer,
1171  * and stores the time between the last update and the last PDP in pre_time,
1172  * and the time between the last PDP and the current time in post_int.
1173  */
1174 static int calculate_elapsed_steps(
1175     rrd_t *rrd,
1176     unsigned long current_time,
1177     unsigned long current_time_usec,
1178     double interval,
1179     double *pre_int,
1180     double *post_int,
1181     unsigned long *proc_pdp_cnt)
1182 {
1183     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1184     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1185                                  * time */
1186     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1187                                  * when it was last updated */
1188     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1189
1190     /* when was the current pdp started */
1191     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1192     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1193
1194     /* when did the last pdp_st occur */
1195     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1196     occu_pdp_st = current_time - occu_pdp_age;
1197
1198     if (occu_pdp_st > proc_pdp_st) {
1199         /* OK we passed the pdp_st moment */
1200         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1201                                                                      * occurred before the latest
1202                                                                      * pdp_st moment*/
1203         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1204         *post_int = occu_pdp_age;   /* how much after it */
1205         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1206     } else {
1207         *pre_int = interval;
1208         *post_int = 0;
1209     }
1210
1211     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1212
1213 #ifdef DEBUG
1214     printf("proc_pdp_age %lu\t"
1215            "proc_pdp_st %lu\t"
1216            "occu_pfp_age %lu\t"
1217            "occu_pdp_st %lu\t"
1218            "int %lf\t"
1219            "pre_int %lf\t"
1220            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1221            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1222 #endif
1223
1224     /* compute the number of elapsed pdp_st moments */
1225     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1226 }
1227
1228 /*
1229  * Increment the PDP values by the values in pdp_new, or else initialize them.
1230  */
1231 static void simple_update(
1232     rrd_t *rrd,
1233     double interval,
1234     rrd_value_t *pdp_new)
1235 {
1236     int       i;
1237
1238     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1239         if (isnan(pdp_new[i])) {
1240             /* this is not really accurate if we use subsecond data arrival time
1241                should have thought of it when going subsecond resolution ...
1242                sorry next format change we will have it! */
1243             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1244                 floor(interval);
1245         } else {
1246             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1247                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1248             } else {
1249                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1250             }
1251         }
1252 #ifdef DEBUG
1253         fprintf(stderr,
1254                 "NO PDP  ds[%i]\t"
1255                 "value %10.2f\t"
1256                 "unkn_sec %5lu\n",
1257                 i,
1258                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1259                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1260 #endif
1261     }
1262 }
1263
1264 /*
1265  * Call process_pdp_st for each DS.
1266  *
1267  * Returns 0 on success, -1 on error.
1268  */
1269 static int process_all_pdp_st(
1270     rrd_t *rrd,
1271     double interval,
1272     double pre_int,
1273     double post_int,
1274     unsigned long elapsed_pdp_st,
1275     rrd_value_t *pdp_new,
1276     rrd_value_t *pdp_temp)
1277 {
1278     unsigned long ds_idx;
1279
1280     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1281        rate*seconds which occurred up to the last run.
1282        pdp_new[] contains rate*seconds from the latest run.
1283        pdp_temp[] will contain the rate for cdp */
1284
1285     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1286         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1287                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1288                            pdp_new, pdp_temp) == -1) {
1289             return -1;
1290         }
1291 #ifdef DEBUG
1292         fprintf(stderr, "PDP UPD ds[%lu]\t"
1293                 "elapsed_pdp_st %lu\t"
1294                 "pdp_temp %10.2f\t"
1295                 "new_prep %10.2f\t"
1296                 "new_unkn_sec %5lu\n",
1297                 ds_idx,
1298                 elapsed_pdp_st,
1299                 pdp_temp[ds_idx],
1300                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1301                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1302 #endif
1303     }
1304     return 0;
1305 }
1306
1307 /*
1308  * Process an update that occurs after one of the PDP moments.
1309  * Increments the PDP value, sets NAN if time greater than the
1310  * heartbeats have elapsed, processes CDEFs.
1311  *
1312  * Returns 0 on success, -1 on error.
1313  */
1314 static int process_pdp_st(
1315     rrd_t *rrd,
1316     unsigned long ds_idx,
1317     double interval,
1318     double pre_int,
1319     double post_int,
1320     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1321     rrd_value_t *pdp_new,
1322     rrd_value_t *pdp_temp)
1323 {
1324     int       i;
1325
1326     /* update pdp_prep to the current pdp_st. */
1327     double    pre_unknown = 0.0;
1328     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1329     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1330
1331     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1332
1333     rpnstack_init(&rpnstack);
1334
1335
1336     if (isnan(pdp_new[ds_idx])) {
1337         /* a final bit of unknown to be added before calculation
1338            we use a temporary variable for this so that we
1339            don't have to turn integer lines before using the value */
1340         pre_unknown = pre_int;
1341     } else {
1342         if (isnan(scratch[PDP_val].u_val)) {
1343             scratch[PDP_val].u_val = 0;
1344         }
1345         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1346     }
1347
1348     /* if too much of the pdp_prep is unknown we dump it */
1349     /* if the interval is larger thatn mrhb we get NAN */
1350     if ((interval > mrhb) ||
1351         (rrd->stat_head->pdp_step / 2.0 <
1352          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1353         pdp_temp[ds_idx] = DNAN;
1354     } else {
1355         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1356             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1357              pre_unknown);
1358     }
1359
1360     /* process CDEF data sources; remember each CDEF DS can
1361      * only reference other DS with a lower index number */
1362     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1363         rpnp_t   *rpnp;
1364
1365         rpnp =
1366             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1367         if(rpnp == NULL) {
1368           rpnstack_free(&rpnstack);
1369           return -1;
1370         }
1371         /* substitute data values for OP_VARIABLE nodes */
1372         for (i = 0; rpnp[i].op != OP_END; i++) {
1373             if (rpnp[i].op == OP_VARIABLE) {
1374                 rpnp[i].op = OP_NUMBER;
1375                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1376             }
1377         }
1378         /* run the rpn calculator */
1379         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1380             free(rpnp);
1381             rpnstack_free(&rpnstack);
1382             return -1;
1383         }
1384         free(rpnp);
1385     }
1386
1387     /* make pdp_prep ready for the next run */
1388     if (isnan(pdp_new[ds_idx])) {
1389         /* this is not realy accurate if we use subsecond data arival time
1390            should have thought of it when going subsecond resolution ...
1391            sorry next format change we will have it! */
1392         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1393         scratch[PDP_val].u_val = DNAN;
1394     } else {
1395         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1396         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1397     }
1398     rpnstack_free(&rpnstack);
1399     return 0;
1400 }
1401
1402 /*
1403  * Iterate over all the RRAs for a given DS and:
1404  * 1. Decide whether to schedule a smooth later
1405  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1406  * 3. Update the CDP
1407  *
1408  * Returns 0 on success, -1 on error
1409  */
1410 static int update_all_cdp_prep(
1411     rrd_t *rrd,
1412     unsigned long *rra_step_cnt,
1413     unsigned long rra_begin,
1414     rrd_file_t *rrd_file,
1415     unsigned long elapsed_pdp_st,
1416     unsigned long proc_pdp_cnt,
1417     rrd_value_t **last_seasonal_coef,
1418     rrd_value_t **seasonal_coef,
1419     rrd_value_t *pdp_temp,
1420     unsigned long *skip_update,
1421     int *schedule_smooth)
1422 {
1423     unsigned long rra_idx;
1424
1425     /* index into the CDP scratch array */
1426     enum cf_en current_cf;
1427     unsigned long rra_start;
1428
1429     /* number of rows to be updated in an RRA for a data value. */
1430     unsigned long start_pdp_offset;
1431
1432     rra_start = rra_begin;
1433     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1434         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1435         start_pdp_offset =
1436             rrd->rra_def[rra_idx].pdp_cnt -
1437             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1438         skip_update[rra_idx] = 0;
1439         if (start_pdp_offset <= elapsed_pdp_st) {
1440             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1441                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1442         } else {
1443             rra_step_cnt[rra_idx] = 0;
1444         }
1445
1446         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1447             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1448              * so that they will be correct for the next observed value; note that for
1449              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1450              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1451             if (rra_step_cnt[rra_idx] > 1) {
1452                 skip_update[rra_idx] = 1;
1453                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1454                                 elapsed_pdp_st, last_seasonal_coef);
1455                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1456                                 elapsed_pdp_st + 1, seasonal_coef);
1457             }
1458             /* periodically run a smoother for seasonal effects */
1459             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1460 #ifdef DEBUG
1461                 fprintf(stderr,
1462                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1463                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1464                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1465                         u_cnt);
1466 #endif
1467                 *schedule_smooth = 1;
1468             }
1469         }
1470         if (rrd_test_error())
1471             return -1;
1472
1473         if (update_cdp_prep
1474             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1475              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1476              current_cf) == -1) {
1477             return -1;
1478         }
1479         rra_start +=
1480             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1481             sizeof(rrd_value_t);
1482     }
1483     return 0;
1484 }
1485
1486 /* 
1487  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1488  */
1489 static int do_schedule_smooth(
1490     rrd_t *rrd,
1491     unsigned long rra_idx,
1492     unsigned long elapsed_pdp_st)
1493 {
1494     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1495     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1496     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1497     unsigned long seasonal_smooth_idx =
1498         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1499     unsigned long *init_seasonal =
1500         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1501
1502     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1503      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1504      * really an RRA level, not a data source within RRA level parameter, but
1505      * the rra_def is read only for rrd_update (not flushed to disk). */
1506     if (*init_seasonal > BURNIN_CYCLES) {
1507         /* someone has no doubt invented a trick to deal with this wrap around,
1508          * but at least this code is clear. */
1509         if (seasonal_smooth_idx > cur_row) {
1510             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1511              * between PDP and CDP */
1512             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1513         }
1514         /* can't rely on negative numbers because we are working with
1515          * unsigned values */
1516         return (cur_row + elapsed_pdp_st >= row_cnt
1517                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1518     }
1519     /* mark off one of the burn-in cycles */
1520     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1521 }
1522
1523 /*
1524  * For a given RRA, iterate over the data sources and call the appropriate
1525  * consolidation function.
1526  *
1527  * Returns 0 on success, -1 on error.
1528  */
1529 static int update_cdp_prep(
1530     rrd_t *rrd,
1531     unsigned long elapsed_pdp_st,
1532     unsigned long start_pdp_offset,
1533     unsigned long *rra_step_cnt,
1534     int rra_idx,
1535     rrd_value_t *pdp_temp,
1536     rrd_value_t *last_seasonal_coef,
1537     rrd_value_t *seasonal_coef,
1538     int current_cf)
1539 {
1540     unsigned long ds_idx, cdp_idx;
1541
1542     /* update CDP_PREP areas */
1543     /* loop over data soures within each RRA */
1544     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1545
1546         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1547
1548         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1549             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1550                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1551                        elapsed_pdp_st, start_pdp_offset,
1552                        rrd->rra_def[rra_idx].pdp_cnt,
1553                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1554                        rra_idx, ds_idx);
1555         } else {
1556             /* Nothing to consolidate if there's one PDP per CDP. However, if
1557              * we've missed some PDPs, let's update null counters etc. */
1558             if (elapsed_pdp_st > 2) {
1559                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1560                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1561                           (cf_en)current_cf);
1562             }
1563         }
1564
1565         if (rrd_test_error())
1566             return -1;
1567     }                   /* endif data sources loop */
1568     return 0;
1569 }
1570
1571 /*
1572  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1573  * primary value, secondary value, and # of unknowns.
1574  */
1575 static void update_cdp(
1576     unival *scratch,
1577     int current_cf,
1578     rrd_value_t pdp_temp_val,
1579     unsigned long rra_step_cnt,
1580     unsigned long elapsed_pdp_st,
1581     unsigned long start_pdp_offset,
1582     unsigned long pdp_cnt,
1583     rrd_value_t xff,
1584     int i,
1585     int ii)
1586 {
1587     /* shorthand variables */
1588     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1589     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1590     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1591     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1592
1593     if (rra_step_cnt) {
1594         /* If we are in this block, as least 1 CDP value will be written to
1595          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1596          * to be written, then the "fill in" value is the CDP_secondary_val
1597          * entry. */
1598         if (isnan(pdp_temp_val)) {
1599             *cdp_unkn_pdp_cnt += start_pdp_offset;
1600             *cdp_secondary_val = DNAN;
1601         } else {
1602             /* CDP_secondary value is the RRA "fill in" value for intermediary
1603              * CDP data entries. No matter the CF, the value is the same because
1604              * the average, max, min, and last of a list of identical values is
1605              * the same, namely, the value itself. */
1606             *cdp_secondary_val = pdp_temp_val;
1607         }
1608
1609         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1610             *cdp_primary_val = DNAN;
1611         } else {
1612             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1613                                start_pdp_offset, pdp_cnt);
1614         }
1615         *cdp_val =
1616             initialize_carry_over(pdp_temp_val,current_cf,
1617                                   elapsed_pdp_st,
1618                                   start_pdp_offset, pdp_cnt);
1619                /* endif meets xff value requirement for a valid value */
1620         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1621          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1622         if (isnan(pdp_temp_val))
1623             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1624         else
1625             *cdp_unkn_pdp_cnt = 0;
1626     } else {            /* rra_step_cnt[i]  == 0 */
1627
1628 #ifdef DEBUG
1629         if (isnan(*cdp_val)) {
1630             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1631                     i, ii);
1632         } else {
1633             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1634                     i, ii, *cdp_val);
1635         }
1636 #endif
1637         if (isnan(pdp_temp_val)) {
1638             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1639         } else {
1640             *cdp_val =
1641                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1642                                   current_cf, i, ii);
1643         }
1644     }
1645 }
1646
1647 /*
1648  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1649  * on the type of consolidation function.
1650  */
1651 static void initialize_cdp_val(
1652     unival *scratch,
1653     int current_cf,
1654     rrd_value_t pdp_temp_val,
1655     unsigned long start_pdp_offset,
1656     unsigned long pdp_cnt)
1657 {
1658     rrd_value_t cum_val, cur_val;
1659
1660     switch (current_cf) {
1661     case CF_AVERAGE:
1662         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1663         cur_val = IFDNAN(pdp_temp_val, 0.0);
1664         scratch[CDP_primary_val].u_val =
1665             (cum_val + cur_val * start_pdp_offset) /
1666             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1667         break;
1668     case CF_MAXIMUM: 
1669         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1670         cur_val = IFDNAN(pdp_temp_val, -DINF);
1671
1672 #if 0
1673 #ifdef DEBUG
1674         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1675             fprintf(stderr,
1676                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1677                     i, ii);
1678             exit(-1);
1679         }
1680 #endif
1681 #endif
1682         if (cur_val > cum_val)
1683             scratch[CDP_primary_val].u_val = cur_val;
1684         else
1685             scratch[CDP_primary_val].u_val = cum_val;
1686         break;
1687     case CF_MINIMUM:
1688         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1689         cur_val = IFDNAN(pdp_temp_val, DINF);
1690 #if 0
1691 #ifdef DEBUG
1692         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1693             fprintf(stderr,
1694                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1695                     ii);
1696             exit(-1);
1697         }
1698 #endif
1699 #endif
1700         if (cur_val < cum_val)
1701             scratch[CDP_primary_val].u_val = cur_val;
1702         else
1703             scratch[CDP_primary_val].u_val = cum_val;
1704         break;
1705     case CF_LAST:
1706     default:
1707         scratch[CDP_primary_val].u_val = pdp_temp_val;
1708         break;
1709     }
1710 }
1711
1712 /*
1713  * Update the consolidation function for Holt-Winters functions as
1714  * well as other functions that don't actually consolidate multiple
1715  * PDPs.
1716  */
1717 static void reset_cdp(
1718     rrd_t *rrd,
1719     unsigned long elapsed_pdp_st,
1720     rrd_value_t *pdp_temp,
1721     rrd_value_t *last_seasonal_coef,
1722     rrd_value_t *seasonal_coef,
1723     int rra_idx,
1724     int ds_idx,
1725     int cdp_idx,
1726     enum cf_en current_cf)
1727 {
1728     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1729
1730     switch (current_cf) {
1731     case CF_AVERAGE:
1732     default:
1733         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1734         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1735         break;
1736     case CF_SEASONAL:
1737     case CF_DEVSEASONAL:
1738         /* need to update cached seasonal values, so they are consistent
1739          * with the bulk update */
1740         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1741          * CDP_last_deviation are the same. */
1742         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1743         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1744         break;
1745     case CF_HWPREDICT:
1746     case CF_MHWPREDICT:
1747         /* need to update the null_count and last_null_count.
1748          * even do this for non-DNAN pdp_temp because the
1749          * algorithm is not learning from batch updates. */
1750         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1751         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1752         /* fall through */
1753     case CF_DEVPREDICT:
1754         scratch[CDP_primary_val].u_val = DNAN;
1755         scratch[CDP_secondary_val].u_val = DNAN;
1756         break;
1757     case CF_FAILURES:
1758         /* do not count missed bulk values as failures */
1759         scratch[CDP_primary_val].u_val = 0;
1760         scratch[CDP_secondary_val].u_val = 0;
1761         /* need to reset violations buffer.
1762          * could do this more carefully, but for now, just
1763          * assume a bulk update wipes away all violations. */
1764         erase_violations(rrd, cdp_idx, rra_idx);
1765         break;
1766     }
1767 }
1768
1769 static rrd_value_t initialize_carry_over(
1770     rrd_value_t pdp_temp_val,
1771     int current_cf,
1772     unsigned long elapsed_pdp_st,
1773     unsigned long start_pdp_offset,
1774     unsigned long pdp_cnt)
1775 {
1776     unsigned long pdp_into_cdp_cnt = ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1777     if ( pdp_into_cdp_cnt == 0 || isnan(pdp_temp_val)){
1778         switch (current_cf) {
1779         case CF_MAXIMUM:
1780             return -DINF;
1781         case CF_MINIMUM:
1782             return DINF;
1783         case CF_AVERAGE:
1784             return 0;
1785         default:
1786             return DNAN;
1787         }        
1788     } 
1789     else {
1790         switch (current_cf) {
1791         case CF_AVERAGE:
1792             return pdp_temp_val *  pdp_into_cdp_cnt ;
1793         default:
1794             return pdp_temp_val;
1795         }        
1796     }        
1797 }
1798
1799 /*
1800  * Update or initialize a CDP value based on the consolidation
1801  * function.
1802  *
1803  * Returns the new value.
1804  */
1805 static rrd_value_t calculate_cdp_val(
1806     rrd_value_t cdp_val,
1807     rrd_value_t pdp_temp_val,
1808     unsigned long elapsed_pdp_st,
1809     int current_cf,
1810 #ifdef DEBUG
1811     int i,
1812     int ii
1813 #else
1814     int UNUSED(i),
1815     int UNUSED(ii)
1816 #endif
1817     )
1818 {
1819     if (isnan(cdp_val)) {
1820         if (current_cf == CF_AVERAGE) {
1821             pdp_temp_val *= elapsed_pdp_st;
1822         }
1823 #ifdef DEBUG
1824         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1825                 i, ii, pdp_temp_val);
1826 #endif
1827         return pdp_temp_val;
1828     }
1829     if (current_cf == CF_AVERAGE)
1830         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1831     if (current_cf == CF_MINIMUM)
1832         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1833     if (current_cf == CF_MAXIMUM)
1834         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1835
1836     return pdp_temp_val;
1837 }
1838
1839 /*
1840  * For each RRA, update the seasonal values and then call update_aberrant_CF
1841  * for each data source.
1842  *
1843  * Return 0 on success, -1 on error.
1844  */
1845 static int update_aberrant_cdps(
1846     rrd_t *rrd,
1847     rrd_file_t *rrd_file,
1848     unsigned long rra_begin,
1849     unsigned long elapsed_pdp_st,
1850     rrd_value_t *pdp_temp,
1851     rrd_value_t **seasonal_coef)
1852 {
1853     unsigned long rra_idx, ds_idx, j;
1854
1855     /* number of PDP steps since the last update that
1856      * are assigned to the first CDP to be generated
1857      * since the last update. */
1858     unsigned short scratch_idx;
1859     unsigned long rra_start;
1860     enum cf_en current_cf;
1861
1862     /* this loop is only entered if elapsed_pdp_st < 3 */
1863     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1864          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1865         rra_start = rra_begin;
1866         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1867             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1868                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1869                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1870                     if (scratch_idx == CDP_primary_val) {
1871                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1872                                         elapsed_pdp_st + 1, seasonal_coef);
1873                     } else {
1874                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1875                                         elapsed_pdp_st + 2, seasonal_coef);
1876                     }
1877                 }
1878                 if (rrd_test_error())
1879                     return -1;
1880                 /* loop over data soures within each RRA */
1881                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1882                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1883                                        rra_idx * (rrd->stat_head->ds_cnt) +
1884                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1885                                        *seasonal_coef);
1886                 }
1887             }
1888             rra_start += rrd->rra_def[rra_idx].row_cnt
1889                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1890         }
1891     }
1892     return 0;
1893 }
1894
1895 /* 
1896  * Move sequentially through the file, writing one RRA at a time.  Note this
1897  * architecture divorces the computation of CDP with flushing updated RRA
1898  * entries to disk.
1899  *
1900  * Return 0 on success, -1 on error.
1901  */
1902 static int write_to_rras(
1903     rrd_t *rrd,
1904     rrd_file_t *rrd_file,
1905     unsigned long *rra_step_cnt,
1906     unsigned long rra_begin,
1907     time_t current_time,
1908     unsigned long *skip_update,
1909     rrd_info_t ** pcdp_summary)
1910 {
1911     unsigned long rra_idx;
1912     unsigned long rra_start;
1913     time_t    rra_time = 0; /* time of update for a RRA */
1914
1915     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1916     
1917     /* Ready to write to disk */
1918     rra_start = rra_begin;
1919
1920     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1921         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1922         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1923
1924         /* for cdp_prep */
1925         unsigned short scratch_idx;
1926         unsigned long step_subtract;
1927
1928         for (scratch_idx = CDP_primary_val,
1929                  step_subtract = 1;
1930              rra_step_cnt[rra_idx] > 0;
1931              rra_step_cnt[rra_idx]--,
1932                  scratch_idx = CDP_secondary_val,
1933                  step_subtract = 2) {
1934
1935             size_t rra_pos_new;
1936 #ifdef DEBUG
1937             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1938 #endif
1939             /* increment, with wrap-around */
1940             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1941               rra_ptr->cur_row = 0;
1942
1943             /* we know what our position should be */
1944             rra_pos_new = rra_start
1945               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1946
1947             /* re-seek if the position is wrong or we wrapped around */
1948             if ((size_t)rra_pos_new != rrd_file->pos) {
1949                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1950                     rrd_set_error("seek error in rrd");
1951                     return -1;
1952                 }
1953             }
1954 #ifdef DEBUG
1955             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1956 #endif
1957
1958             if (skip_update[rra_idx])
1959                 continue;
1960
1961             if (*pcdp_summary != NULL) {
1962                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1963
1964                 rra_time = (current_time - current_time % step_time)
1965                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1966             }
1967
1968             if (write_RRA_row
1969                 (rrd_file, rrd, rra_idx, scratch_idx,
1970                  pcdp_summary, rra_time) == -1)
1971                 return -1;
1972
1973             rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1974         }
1975
1976         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1977     } /* RRA LOOP */
1978
1979     return 0;
1980 }
1981
1982 /*
1983  * Write out one row of values (one value per DS) to the archive.
1984  *
1985  * Returns 0 on success, -1 on error.
1986  */
1987 static int write_RRA_row(
1988     rrd_file_t *rrd_file,
1989     rrd_t *rrd,
1990     unsigned long rra_idx,
1991     unsigned short CDP_scratch_idx,
1992     rrd_info_t ** pcdp_summary,
1993     time_t rra_time)
1994 {
1995     unsigned long ds_idx, cdp_idx;
1996     rrd_infoval_t iv;
1997
1998     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1999         /* compute the cdp index */
2000         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2001 #ifdef DEBUG
2002         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2003                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2004                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2005 #endif
2006         if (*pcdp_summary != NULL) {
2007             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2008             /* append info to the return hash */
2009             *pcdp_summary = rrd_info_push(*pcdp_summary,
2010                                           sprintf_alloc
2011                                           ("[%lli]RRA[%s][%lu]DS[%s]", 
2012                                            (long long)rra_time,
2013                                            rrd->rra_def[rra_idx].cf_nam,
2014                                            rrd->rra_def[rra_idx].pdp_cnt,
2015                                            rrd->ds_def[ds_idx].ds_nam),
2016                                            RD_I_VAL, iv);
2017         }
2018         errno = 0;
2019         if (rrd_write(rrd_file,
2020                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2021                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2022             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2023             return -1;
2024         }
2025     }
2026     return 0;
2027 }
2028
2029 /*
2030  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2031  *
2032  * Returns 0 on success, -1 otherwise
2033  */
2034 static int smooth_all_rras(
2035     rrd_t *rrd,
2036     rrd_file_t *rrd_file,
2037     unsigned long rra_begin)
2038 {
2039     unsigned long rra_start = rra_begin;
2040     unsigned long rra_idx;
2041
2042     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2043         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2044             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2045 #ifdef DEBUG
2046             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2047 #endif
2048             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2049             if (rrd_test_error())
2050                 return -1;
2051         }
2052         rra_start += rrd->rra_def[rra_idx].row_cnt
2053             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2054     }
2055     return 0;
2056 }
2057
2058 #ifndef HAVE_MMAP
2059 /*
2060  * Flush changes to disk (unless we're using mmap)
2061  *
2062  * Returns 0 on success, -1 otherwise
2063  */
2064 static int write_changes_to_disk(
2065     rrd_t *rrd,
2066     rrd_file_t *rrd_file,
2067     int version)
2068 {
2069     /* we just need to write back the live header portion now */
2070     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2071                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2072                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2073                  SEEK_SET) != 0) {
2074         rrd_set_error("seek rrd for live header writeback");
2075         return -1;
2076     }
2077     if (version >= 3) {
2078         if (rrd_write(rrd_file, rrd->live_head,
2079                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2080             rrd_set_error("rrd_write live_head to rrd");
2081             return -1;
2082         }
2083     } else {
2084         if (rrd_write(rrd_file, rrd->legacy_last_up,
2085                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2086             rrd_set_error("rrd_write live_head to rrd");
2087             return -1;
2088         }
2089     }
2090
2091
2092     if (rrd_write(rrd_file, rrd->pdp_prep,
2093                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2094         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2095         rrd_set_error("rrd_write pdp_prep to rrd");
2096         return -1;
2097     }
2098
2099     if (rrd_write(rrd_file, rrd->cdp_prep,
2100                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2101                   rrd->stat_head->ds_cnt)
2102         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2103                       rrd->stat_head->ds_cnt)) {
2104
2105         rrd_set_error("rrd_write cdp_prep to rrd");
2106         return -1;
2107     }
2108
2109     if (rrd_write(rrd_file, rrd->rra_ptr,
2110                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2111         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2112         rrd_set_error("rrd_write rra_ptr to rrd");
2113         return -1;
2114     }
2115     return 0;
2116 }
2117 #endif