follow the normal code path for exiting rrd_update if there is a problem with rrdc...
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.4.3  Copyright by Tobi Oetiker, 1997-2010
3  *                Copyright by Florian Forster, 2008
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
9
10 #include "rrd_tool.h"
11
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
17
18 #include <locale.h>
19
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
22
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
25
26 #include "rrd_client.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37     time_t    tv_sec;   /* seconds */
38     long      tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43     int       tz_minuteswest;   /* minutes W of Greenwich */
44     int       tz_dsttime;   /* type of dst correction */
45 };
46
47 static int gettimeofday(
48     struct timeval *t,
49     struct __timezone *tz)
50 {
51
52     struct _timeb current_time;
53
54     _ftime(&current_time);
55
56     t->tv_sec = current_time.time;
57     t->tv_usec = current_time.millitm * 1000;
58
59     return 0;
60 }
61
62 #endif
63
64 /* FUNCTION PROTOTYPES */
65
66 int       rrd_update_r(
67     const char *filename,
68     const char *tmplt,
69     int argc,
70     const char **argv);
71 int       _rrd_update(
72     const char *filename,
73     const char *tmplt,
74     int argc,
75     const char **argv,
76     rrd_info_t *);
77
78 static int allocate_data_structures(
79     rrd_t *rrd,
80     char ***updvals,
81     rrd_value_t **pdp_temp,
82     const char *tmplt,
83     long **tmpl_idx,
84     unsigned long *tmpl_cnt,
85     unsigned long **rra_step_cnt,
86     unsigned long **skip_update,
87     rrd_value_t **pdp_new);
88
89 static int parse_template(
90     rrd_t *rrd,
91     const char *tmplt,
92     unsigned long *tmpl_cnt,
93     long *tmpl_idx);
94
95 static int process_arg(
96     char *step_start,
97     rrd_t *rrd,
98     rrd_file_t *rrd_file,
99     unsigned long rra_begin,
100     time_t *current_time,
101     unsigned long *current_time_usec,
102     rrd_value_t *pdp_temp,
103     rrd_value_t *pdp_new,
104     unsigned long *rra_step_cnt,
105     char **updvals,
106     long *tmpl_idx,
107     unsigned long tmpl_cnt,
108     rrd_info_t ** pcdp_summary,
109     int version,
110     unsigned long *skip_update,
111     int *schedule_smooth);
112
113 static int parse_ds(
114     rrd_t *rrd,
115     char **updvals,
116     long *tmpl_idx,
117     char *input,
118     unsigned long tmpl_cnt,
119     time_t *current_time,
120     unsigned long *current_time_usec,
121     int version);
122
123 static int get_time_from_reading(
124     rrd_t *rrd,
125     char timesyntax,
126     char **updvals,
127     time_t *current_time,
128     unsigned long *current_time_usec,
129     int version);
130
131 static int update_pdp_prep(
132     rrd_t *rrd,
133     char **updvals,
134     rrd_value_t *pdp_new,
135     double interval);
136
137 static int calculate_elapsed_steps(
138     rrd_t *rrd,
139     unsigned long current_time,
140     unsigned long current_time_usec,
141     double interval,
142     double *pre_int,
143     double *post_int,
144     unsigned long *proc_pdp_cnt);
145
146 static void simple_update(
147     rrd_t *rrd,
148     double interval,
149     rrd_value_t *pdp_new);
150
151 static int process_all_pdp_st(
152     rrd_t *rrd,
153     double interval,
154     double pre_int,
155     double post_int,
156     unsigned long elapsed_pdp_st,
157     rrd_value_t *pdp_new,
158     rrd_value_t *pdp_temp);
159
160 static int process_pdp_st(
161     rrd_t *rrd,
162     unsigned long ds_idx,
163     double interval,
164     double pre_int,
165     double post_int,
166     long diff_pdp_st,
167     rrd_value_t *pdp_new,
168     rrd_value_t *pdp_temp);
169
170 static int update_all_cdp_prep(
171     rrd_t *rrd,
172     unsigned long *rra_step_cnt,
173     unsigned long rra_begin,
174     rrd_file_t *rrd_file,
175     unsigned long elapsed_pdp_st,
176     unsigned long proc_pdp_cnt,
177     rrd_value_t **last_seasonal_coef,
178     rrd_value_t **seasonal_coef,
179     rrd_value_t *pdp_temp,
180     unsigned long *skip_update,
181     int *schedule_smooth);
182
183 static int do_schedule_smooth(
184     rrd_t *rrd,
185     unsigned long rra_idx,
186     unsigned long elapsed_pdp_st);
187
188 static int update_cdp_prep(
189     rrd_t *rrd,
190     unsigned long elapsed_pdp_st,
191     unsigned long start_pdp_offset,
192     unsigned long *rra_step_cnt,
193     int rra_idx,
194     rrd_value_t *pdp_temp,
195     rrd_value_t *last_seasonal_coef,
196     rrd_value_t *seasonal_coef,
197     int current_cf);
198
199 static void update_cdp(
200     unival *scratch,
201     int current_cf,
202     rrd_value_t pdp_temp_val,
203     unsigned long rra_step_cnt,
204     unsigned long elapsed_pdp_st,
205     unsigned long start_pdp_offset,
206     unsigned long pdp_cnt,
207     rrd_value_t xff,
208     int i,
209     int ii);
210
211 static void initialize_cdp_val(
212     unival *scratch,
213     int current_cf,
214     rrd_value_t pdp_temp_val,
215     unsigned long start_pdp_offset,
216     unsigned long pdp_cnt);
217
218 static void reset_cdp(
219     rrd_t *rrd,
220     unsigned long elapsed_pdp_st,
221     rrd_value_t *pdp_temp,
222     rrd_value_t *last_seasonal_coef,
223     rrd_value_t *seasonal_coef,
224     int rra_idx,
225     int ds_idx,
226     int cdp_idx,
227     enum cf_en current_cf);
228
229 static rrd_value_t initialize_carry_over(
230     rrd_value_t pdp_temp_val,
231     int         current_cf,
232     unsigned long elapsed_pdp_st,
233     unsigned long start_pdp_offset,
234     unsigned long pdp_cnt);
235
236 static rrd_value_t calculate_cdp_val(
237     rrd_value_t cdp_val,
238     rrd_value_t pdp_temp_val,
239     unsigned long elapsed_pdp_st,
240     int current_cf,
241     int i,
242     int ii);
243
244 static int update_aberrant_cdps(
245     rrd_t *rrd,
246     rrd_file_t *rrd_file,
247     unsigned long rra_begin,
248     unsigned long elapsed_pdp_st,
249     rrd_value_t *pdp_temp,
250     rrd_value_t **seasonal_coef);
251
252 static int write_to_rras(
253     rrd_t *rrd,
254     rrd_file_t *rrd_file,
255     unsigned long *rra_step_cnt,
256     unsigned long rra_begin,
257     time_t current_time,
258     unsigned long *skip_update,
259     rrd_info_t ** pcdp_summary);
260
261 static int write_RRA_row(
262     rrd_file_t *rrd_file,
263     rrd_t *rrd,
264     unsigned long rra_idx,
265     unsigned short CDP_scratch_idx,
266     rrd_info_t ** pcdp_summary,
267     time_t rra_time);
268
269 static int smooth_all_rras(
270     rrd_t *rrd,
271     rrd_file_t *rrd_file,
272     unsigned long rra_begin);
273
274 #ifndef HAVE_MMAP
275 static int write_changes_to_disk(
276     rrd_t *rrd,
277     rrd_file_t *rrd_file,
278     int version);
279 #endif
280
281 /*
282  * normalize time as returned by gettimeofday. usec part must
283  * be always >= 0
284  */
285 static void normalize_time(
286     struct timeval *t)
287 {
288     if (t->tv_usec < 0) {
289         t->tv_sec--;
290         t->tv_usec += 1e6L;
291     }
292 }
293
294 /*
295  * Sets current_time and current_time_usec based on the current time.
296  * current_time_usec is set to 0 if the version number is 1 or 2.
297  */
298 static void initialize_time(
299     time_t *current_time,
300     unsigned long *current_time_usec,
301     int version)
302 {
303     struct timeval tmp_time;    /* used for time conversion */
304
305     gettimeofday(&tmp_time, 0);
306     normalize_time(&tmp_time);
307     *current_time = tmp_time.tv_sec;
308     if (version >= 3) {
309         *current_time_usec = tmp_time.tv_usec;
310     } else {
311         *current_time_usec = 0;
312     }
313 }
314
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
316
317 rrd_info_t *rrd_update_v(
318     int argc,
319     char **argv)
320 {
321     char     *tmplt = NULL;
322     rrd_info_t *result = NULL;
323     rrd_infoval_t rc;
324     char *opt_daemon = NULL;
325     struct option long_options[] = {
326         {"template", required_argument, 0, 't'},
327         {0, 0, 0, 0}
328     };
329
330     rc.u_int = -1;
331     optind = 0;
332     opterr = 0;         /* initialize getopt */
333
334     while (1) {
335         int       option_index = 0;
336         int       opt;
337
338         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
339
340         if (opt == EOF)
341             break;
342
343         switch (opt) {
344         case 't':
345             tmplt = optarg;
346             break;
347
348         case '?':
349             rrd_set_error("unknown option '%s'", argv[optind - 1]);
350             goto end_tag;
351         }
352     }
353
354     opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355     if (opt_daemon != NULL) {
356         rrd_set_error ("The \"%s\" environment variable is defined, "
357                 "but \"%s\" cannot work with rrdcached. Either unset "
358                 "the environment variable or use \"update\" instead.",
359                 ENV_RRDCACHED_ADDRESS, argv[0]);
360         goto end_tag;
361     }
362
363     /* need at least 2 arguments: filename, data. */
364     if (argc - optind < 2) {
365         rrd_set_error("Not enough arguments");
366         goto end_tag;
367     }
368     rc.u_int = 0;
369     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370     rc.u_int = _rrd_update(argv[optind], tmplt,
371                            argc - optind - 1,
372                            (const char **) (argv + optind + 1), result);
373     result->value.u_int = rc.u_int;
374   end_tag:
375     return result;
376 }
377
378 int rrd_update(
379     int argc,
380     char **argv)
381 {
382     struct option long_options[] = {
383         {"template", required_argument, 0, 't'},
384         {"daemon",   required_argument, 0, 'd'},
385         {0, 0, 0, 0}
386     };
387     int       option_index = 0;
388     int       opt;
389     char     *tmplt = NULL;
390     int       rc = -1;
391     char     *opt_daemon = NULL;
392
393     optind = 0;
394     opterr = 0;         /* initialize getopt */
395
396     while (1) {
397         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
398
399         if (opt == EOF)
400             break;
401
402         switch (opt) {
403         case 't':
404             tmplt = strdup(optarg);
405             break;
406
407         case 'd':
408             if (opt_daemon != NULL)
409                 free (opt_daemon);
410             opt_daemon = strdup (optarg);
411             if (opt_daemon == NULL)
412             {
413                 rrd_set_error("strdup failed.");
414                 goto out;
415             }
416             break;
417
418         case '?':
419             rrd_set_error("unknown option '%s'", argv[optind - 1]);
420             goto out;
421         }
422     }
423
424     /* need at least 2 arguments: filename, data. */
425     if (argc - optind < 2) {
426         rrd_set_error("Not enough arguments");
427         goto out;
428     }
429
430     {   /* try to connect to rrdcached */
431         int status = rrdc_connect(opt_daemon);
432         if (status != 0) {        
433              rc = status;           
434              goto out;           
435         }        
436     }
437
438     if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
439     {
440         rrd_set_error("The caching daemon cannot be used together with "
441                 "templates yet.");
442         goto out;
443     }
444
445     if (! rrdc_is_connected(opt_daemon))
446     {
447       rc = rrd_update_r(argv[optind], tmplt,
448                         argc - optind - 1, (const char **) (argv + optind + 1));
449     }
450     else /* we are connected */
451     {
452         rc = rrdc_update (argv[optind], /* file */
453                           argc - optind - 1, /* values_num */
454                           (const char *const *) (argv + optind + 1)); /* values */
455         if (rc > 0)
456             rrd_set_error("Failed sending the values to rrdcached: %s",
457                           rrd_strerror (rc));
458     }
459
460   out:
461     if (tmplt != NULL)
462     {
463         free(tmplt);
464         tmplt = NULL;
465     }
466     if (opt_daemon != NULL)
467     {
468         free (opt_daemon);
469         opt_daemon = NULL;
470     }
471     return rc;
472 }
473
474 int rrd_update_r(
475     const char *filename,
476     const char *tmplt,
477     int argc,
478     const char **argv)
479 {
480     return _rrd_update(filename, tmplt, argc, argv, NULL);
481 }
482
483 int rrd_update_v_r(
484     const char *filename,
485     const char *tmplt,
486     int argc,
487     const char **argv,
488     rrd_info_t * pcdp_summary)
489 {
490     return _rrd_update(filename, tmplt, argc, argv, pcdp_summary);
491 }
492
493 int _rrd_update(
494     const char *filename,
495     const char *tmplt,
496     int argc,
497     const char **argv,
498     rrd_info_t * pcdp_summary)
499 {
500
501     int       arg_i = 2;
502
503     unsigned long rra_begin;    /* byte pointer to the rra
504                                  * area in the rrd file.  this
505                                  * pointer never changes value */
506     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
507                              * to the existing entry */
508     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
509                              * to the cdp values */
510
511     long     *tmpl_idx; /* index representing the settings
512                          * transported by the tmplt index */
513     unsigned long tmpl_cnt = 2; /* time and data */
514     rrd_t     rrd;
515     time_t    current_time = 0;
516     unsigned long current_time_usec = 0;    /* microseconds part of current time */
517     char    **updvals;
518     int       schedule_smooth = 0;
519
520     /* number of elapsed PDP steps since last update */
521     unsigned long *rra_step_cnt = NULL;
522
523     int       version;  /* rrd version */
524     rrd_file_t *rrd_file;
525     char     *arg_copy; /* for processing the argv */
526     unsigned long *skip_update; /* RRAs to advance but not write */
527
528     /* need at least 1 arguments: data. */
529     if (argc < 1) {
530         rrd_set_error("Not enough arguments");
531         goto err_out;
532     }
533
534     rrd_init(&rrd);
535     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
536         goto err_free;
537     }
538     /* We are now at the beginning of the rra's */
539     rra_begin = rrd_file->header_len;
540
541     version = atoi(rrd.stat_head->version);
542
543     initialize_time(&current_time, &current_time_usec, version);
544
545     /* get exclusive lock to whole file.
546      * lock gets removed when we close the file.
547      */
548     if (rrd_lock(rrd_file) != 0) {
549         rrd_set_error("could not lock RRD");
550         goto err_close;
551     }
552
553     if (allocate_data_structures(&rrd, &updvals,
554                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
555                                  &rra_step_cnt, &skip_update,
556                                  &pdp_new) == -1) {
557         goto err_close;
558     }
559
560     /* loop through the arguments. */
561     for (arg_i = 0; arg_i < argc; arg_i++) {
562         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
563             rrd_set_error("failed duplication argv entry");
564             break;
565         }
566         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
567                         &current_time, &current_time_usec, pdp_temp, pdp_new,
568                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
569                         &pcdp_summary, version, skip_update,
570                         &schedule_smooth) == -1) {
571             if (rrd_test_error()) { /* Should have error string always here */
572                 char     *save_error;
573
574                 /* Prepend file name to error message */
575                 if ((save_error = strdup(rrd_get_error())) != NULL) {
576                     rrd_set_error("%s: %s", filename, save_error);
577                     free(save_error);
578                 }
579             }
580             free(arg_copy);
581             break;
582         }
583         free(arg_copy);
584     }
585
586     free(rra_step_cnt);
587
588     /* if we got here and if there is an error and if the file has not been
589      * written to, then close things up and return. */
590     if (rrd_test_error()) {
591         goto err_free_structures;
592     }
593 #ifndef HAVE_MMAP
594     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
595         goto err_free_structures;
596     }
597 #endif
598
599     /* calling the smoothing code here guarantees at most one smoothing
600      * operation per rrd_update call. Unfortunately, it is possible with bulk
601      * updates, or a long-delayed update for smoothing to occur off-schedule.
602      * This really isn't critical except during the burn-in cycles. */
603     if (schedule_smooth) {
604         smooth_all_rras(&rrd, rrd_file, rra_begin);
605     }
606
607 /*    rrd_dontneed(rrd_file,&rrd); */
608     rrd_free(&rrd);
609     rrd_close(rrd_file);
610
611     free(pdp_new);
612     free(tmpl_idx);
613     free(pdp_temp);
614     free(skip_update);
615     free(updvals);
616     return 0;
617
618   err_free_structures:
619     free(pdp_new);
620     free(tmpl_idx);
621     free(pdp_temp);
622     free(skip_update);
623     free(updvals);
624   err_close:
625     rrd_close(rrd_file);
626   err_free:
627     rrd_free(&rrd);
628   err_out:
629     return -1;
630 }
631
632 /*
633  * Allocate some important arrays used, and initialize the template.
634  *
635  * When it returns, either all of the structures are allocated
636  * or none of them are.
637  *
638  * Returns 0 on success, -1 on error.
639  */
640 static int allocate_data_structures(
641     rrd_t *rrd,
642     char ***updvals,
643     rrd_value_t **pdp_temp,
644     const char *tmplt,
645     long **tmpl_idx,
646     unsigned long *tmpl_cnt,
647     unsigned long **rra_step_cnt,
648     unsigned long **skip_update,
649     rrd_value_t **pdp_new)
650 {
651     unsigned  i, ii;
652     if ((*updvals = (char **) malloc(sizeof(char *)
653                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
654         rrd_set_error("allocating updvals pointer array.");
655         return -1;
656     }
657     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
658                                             * rrd->stat_head->ds_cnt)) ==
659         NULL) {
660         rrd_set_error("allocating pdp_temp.");
661         goto err_free_updvals;
662     }
663     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
664                                                  *
665                                                  rrd->stat_head->rra_cnt)) ==
666         NULL) {
667         rrd_set_error("allocating skip_update.");
668         goto err_free_pdp_temp;
669     }
670     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
671                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
672         rrd_set_error("allocating tmpl_idx.");
673         goto err_free_skip_update;
674     }
675     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
676                                                   *
677                                                   (rrd->stat_head->
678                                                    rra_cnt))) == NULL) {
679         rrd_set_error("allocating rra_step_cnt.");
680         goto err_free_tmpl_idx;
681     }
682
683     /* initialize tmplt redirector */
684     /* default config example (assume DS 1 is a CDEF DS)
685        tmpl_idx[0] -> 0; (time)
686        tmpl_idx[1] -> 1; (DS 0)
687        tmpl_idx[2] -> 3; (DS 2)
688        tmpl_idx[3] -> 4; (DS 3) */
689     (*tmpl_idx)[0] = 0; /* time */
690     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
691         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
692             (*tmpl_idx)[ii++] = i;
693     }
694     *tmpl_cnt = ii;
695
696     if (tmplt != NULL) {
697         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
698             goto err_free_rra_step_cnt;
699         }
700     }
701
702     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
703                                            * rrd->stat_head->ds_cnt)) == NULL) {
704         rrd_set_error("allocating pdp_new.");
705         goto err_free_rra_step_cnt;
706     }
707
708     return 0;
709
710   err_free_rra_step_cnt:
711     free(*rra_step_cnt);
712   err_free_tmpl_idx:
713     free(*tmpl_idx);
714   err_free_skip_update:
715     free(*skip_update);
716   err_free_pdp_temp:
717     free(*pdp_temp);
718   err_free_updvals:
719     free(*updvals);
720     return -1;
721 }
722
723 /*
724  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
725  *
726  * Returns 0 on success.
727  */
728 static int parse_template(
729     rrd_t *rrd,
730     const char *tmplt,
731     unsigned long *tmpl_cnt,
732     long *tmpl_idx)
733 {
734     char     *dsname, *tmplt_copy;
735     unsigned int tmpl_len, i;
736     int       ret = 0;
737
738     *tmpl_cnt = 1;      /* the first entry is the time */
739
740     /* we should work on a writeable copy here */
741     if ((tmplt_copy = strdup(tmplt)) == NULL) {
742         rrd_set_error("error copying tmplt '%s'", tmplt);
743         ret = -1;
744         goto out;
745     }
746
747     dsname = tmplt_copy;
748     tmpl_len = strlen(tmplt_copy);
749     for (i = 0; i <= tmpl_len; i++) {
750         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
751             tmplt_copy[i] = '\0';
752             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
753                 rrd_set_error("tmplt contains more DS definitions than RRD");
754                 ret = -1;
755                 goto out_free_tmpl_copy;
756             }
757             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
758                 rrd_set_error("unknown DS name '%s'", dsname);
759                 ret = -1;
760                 goto out_free_tmpl_copy;
761             }
762             /* go to the next entry on the tmplt_copy */
763             if (i < tmpl_len)
764                 dsname = &tmplt_copy[i + 1];
765         }
766     }
767   out_free_tmpl_copy:
768     free(tmplt_copy);
769   out:
770     return ret;
771 }
772
773 /*
774  * Parse an update string, updates the primary data points (PDPs)
775  * and consolidated data points (CDPs), and writes changes to the RRAs.
776  *
777  * Returns 0 on success, -1 on error.
778  */
779 static int process_arg(
780     char *step_start,
781     rrd_t *rrd,
782     rrd_file_t *rrd_file,
783     unsigned long rra_begin,
784     time_t *current_time,
785     unsigned long *current_time_usec,
786     rrd_value_t *pdp_temp,
787     rrd_value_t *pdp_new,
788     unsigned long *rra_step_cnt,
789     char **updvals,
790     long *tmpl_idx,
791     unsigned long tmpl_cnt,
792     rrd_info_t ** pcdp_summary,
793     int version,
794     unsigned long *skip_update,
795     int *schedule_smooth)
796 {
797     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
798
799     /* a vector of future Holt-Winters seasonal coefs */
800     unsigned long elapsed_pdp_st;
801
802     double    interval, pre_int, post_int;  /* interval between this and
803                                              * the last run */
804     unsigned long proc_pdp_cnt;
805
806     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
807                  current_time, current_time_usec, version) == -1) {
808         return -1;
809     }
810
811     interval = (double) (*current_time - rrd->live_head->last_up)
812         + (double) ((long) *current_time_usec -
813                     (long) rrd->live_head->last_up_usec) / 1e6f;
814
815     /* process the data sources and update the pdp_prep 
816      * area accordingly */
817     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
818         return -1;
819     }
820
821     elapsed_pdp_st = calculate_elapsed_steps(rrd,
822                                              *current_time,
823                                              *current_time_usec, interval,
824                                              &pre_int, &post_int,
825                                              &proc_pdp_cnt);
826
827     /* has a pdp_st moment occurred since the last run ? */
828     if (elapsed_pdp_st == 0) {
829         /* no we have not passed a pdp_st moment. therefore update is simple */
830         simple_update(rrd, interval, pdp_new);
831     } else {
832         /* an pdp_st has occurred. */
833         if (process_all_pdp_st(rrd, interval,
834                                pre_int, post_int,
835                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
836             return -1;
837         }
838         if (update_all_cdp_prep(rrd, rra_step_cnt,
839                                 rra_begin, rrd_file,
840                                 elapsed_pdp_st,
841                                 proc_pdp_cnt,
842                                 &last_seasonal_coef,
843                                 &seasonal_coef,
844                                 pdp_temp,
845                                 skip_update, schedule_smooth) == -1) {
846             goto err_free_coefficients;
847         }
848         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
849                                  elapsed_pdp_st, pdp_temp,
850                                  &seasonal_coef) == -1) {
851             goto err_free_coefficients;
852         }
853         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
854                           *current_time, skip_update,
855                           pcdp_summary) == -1) {
856             goto err_free_coefficients;
857         }
858     }                   /* endif a pdp_st has occurred */
859     rrd->live_head->last_up = *current_time;
860     rrd->live_head->last_up_usec = *current_time_usec;
861
862     if (version < 3) {
863         *rrd->legacy_last_up = rrd->live_head->last_up;
864     }
865     free(seasonal_coef);
866     free(last_seasonal_coef);
867     return 0;
868
869   err_free_coefficients:
870     free(seasonal_coef);
871     free(last_seasonal_coef);
872     return -1;
873 }
874
875 /*
876  * Parse a DS string (time + colon-separated values), storing the
877  * results in current_time, current_time_usec, and updvals.
878  *
879  * Returns 0 on success, -1 on error.
880  */
881 static int parse_ds(
882     rrd_t *rrd,
883     char **updvals,
884     long *tmpl_idx,
885     char *input,
886     unsigned long tmpl_cnt,
887     time_t *current_time,
888     unsigned long *current_time_usec,
889     int version)
890 {
891     char     *p;
892     unsigned long i;
893     char      timesyntax;
894
895     updvals[0] = input;
896     /* initialize all ds input to unknown except the first one
897        which has always got to be set */
898     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
899         updvals[i] = "U";
900
901     /* separate all ds elements; first must be examined separately
902        due to alternate time syntax */
903     if ((p = strchr(input, '@')) != NULL) {
904         timesyntax = '@';
905     } else if ((p = strchr(input, ':')) != NULL) {
906         timesyntax = ':';
907     } else {
908         rrd_set_error("expected timestamp not found in data source from %s",
909                       input);
910         return -1;
911     }
912     *p = '\0';
913     i = 1;
914     updvals[tmpl_idx[i++]] = p + 1;
915     while (*(++p)) {
916         if (*p == ':') {
917             *p = '\0';
918             if (i < tmpl_cnt) {
919                 updvals[tmpl_idx[i++]] = p + 1;
920             }
921             else {
922                 rrd_set_error("found extra data on update argument: %s",p+1);
923                 return -1;
924             }                
925         }
926     }
927
928     if (i != tmpl_cnt) {
929         rrd_set_error("expected %lu data source readings (got %lu) from %s",
930                       tmpl_cnt - 1, i - 1, input);
931         return -1;
932     }
933
934     if (get_time_from_reading(rrd, timesyntax, updvals,
935                               current_time, current_time_usec,
936                               version) == -1) {
937         return -1;
938     }
939     return 0;
940 }
941
942 /*
943  * Parse the time in a DS string, store it in current_time and 
944  * current_time_usec and verify that it's later than the last
945  * update for this DS.
946  *
947  * Returns 0 on success, -1 on error.
948  */
949 static int get_time_from_reading(
950     rrd_t *rrd,
951     char timesyntax,
952     char **updvals,
953     time_t *current_time,
954     unsigned long *current_time_usec,
955     int version)
956 {
957     double    tmp;
958     char     *parsetime_error = NULL;
959     char     *old_locale;
960     rrd_time_value_t ds_tv;
961     struct timeval tmp_time;    /* used for time conversion */
962
963     /* get the time from the reading ... handle N */
964     if (timesyntax == '@') {    /* at-style */
965         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
966             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
967             return -1;
968         }
969         if (ds_tv.type == RELATIVE_TO_END_TIME ||
970             ds_tv.type == RELATIVE_TO_START_TIME) {
971             rrd_set_error("specifying time relative to the 'start' "
972                           "or 'end' makes no sense here: %s", updvals[0]);
973             return -1;
974         }
975         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
976         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
977     } else if (strcmp(updvals[0], "N") == 0) {
978         gettimeofday(&tmp_time, 0);
979         normalize_time(&tmp_time);
980         *current_time = tmp_time.tv_sec;
981         *current_time_usec = tmp_time.tv_usec;
982     } else {
983         old_locale = setlocale(LC_NUMERIC, NULL);
984         setlocale(LC_NUMERIC, "C");
985         errno = 0;
986         tmp = strtod(updvals[0], 0);
987         if (errno > 0) {
988             rrd_set_error("converting '%s' to float: %s",
989                 updvals[0], rrd_strerror(errno));
990             return -1;
991         };
992         setlocale(LC_NUMERIC, old_locale);
993         if (tmp < 0.0){
994             gettimeofday(&tmp_time, 0);
995             tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
996         }
997
998         *current_time = floor(tmp);
999         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1000     }
1001     /* dont do any correction for old version RRDs */
1002     if (version < 3)
1003         *current_time_usec = 0;
1004
1005     if (*current_time < rrd->live_head->last_up ||
1006         (*current_time == rrd->live_head->last_up &&
1007          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1008         rrd_set_error("illegal attempt to update using time %ld when "
1009                       "last update time is %ld (minimum one second step)",
1010                       *current_time, rrd->live_head->last_up);
1011         return -1;
1012     }
1013     return 0;
1014 }
1015
1016 /*
1017  * Update pdp_new by interpreting the updvals according to the DS type
1018  * (COUNTER, GAUGE, etc.).
1019  *
1020  * Returns 0 on success, -1 on error.
1021  */
1022 static int update_pdp_prep(
1023     rrd_t *rrd,
1024     char **updvals,
1025     rrd_value_t *pdp_new,
1026     double interval)
1027 {
1028     unsigned long ds_idx;
1029     int       ii;
1030     char     *endptr;   /* used in the conversion */
1031     double    rate;
1032     char     *old_locale;
1033     enum dst_en dst_idx;
1034
1035     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1036         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1037
1038         /* make sure we do not build diffs with old last_ds values */
1039         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1040             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1041             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1042         }
1043
1044         /* NOTE: DST_CDEF should never enter this if block, because
1045          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1046          * accidently specified a value for the DST_CDEF. To handle this case,
1047          * an extra check is required. */
1048
1049         if ((updvals[ds_idx + 1][0] != 'U') &&
1050             (dst_idx != DST_CDEF) &&
1051             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1052             rate = DNAN;
1053
1054             /* pdp_new contains rate * time ... eg the bytes transferred during
1055              * the interval. Doing it this way saves a lot of math operations
1056              */
1057             switch (dst_idx) {
1058             case DST_COUNTER:
1059             case DST_DERIVE:
1060                 /* Check if this is a valid integer. `U' is already handled in
1061                  * another branch. */
1062                 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1063                     if ((ii == 0) && (dst_idx == DST_DERIVE)
1064                             && (updvals[ds_idx + 1][ii] == '-'))
1065                         continue;
1066
1067                     if ((updvals[ds_idx + 1][ii] < '0')
1068                             || (updvals[ds_idx + 1][ii] > '9')) {
1069                         rrd_set_error("not a simple %s integer: '%s'",
1070                                 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1071                                 updvals[ds_idx + 1]);
1072                         return -1;
1073                     }
1074                 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1075
1076                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1077                     pdp_new[ds_idx] =
1078                         rrd_diff(updvals[ds_idx + 1],
1079                                  rrd->pdp_prep[ds_idx].last_ds);
1080                     if (dst_idx == DST_COUNTER) {
1081                         /* simple overflow catcher. This will fail
1082                          * terribly for non 32 or 64 bit counters
1083                          * ... are there any others in SNMP land?
1084                          */
1085                         if (pdp_new[ds_idx] < (double) 0.0)
1086                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1087                         if (pdp_new[ds_idx] < (double) 0.0)
1088                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1089                     }
1090                     rate = pdp_new[ds_idx] / interval;
1091                 } else {
1092                     pdp_new[ds_idx] = DNAN;
1093                 }
1094                 break;
1095             case DST_ABSOLUTE:
1096                 old_locale = setlocale(LC_NUMERIC, NULL);
1097                 setlocale(LC_NUMERIC, "C");
1098                 errno = 0;
1099                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1100                 if (errno > 0) {
1101                     rrd_set_error("converting '%s' to float: %s",
1102                                   updvals[ds_idx + 1], rrd_strerror(errno));
1103                     return -1;
1104                 };
1105                 setlocale(LC_NUMERIC, old_locale);
1106                 if (endptr[0] != '\0') {
1107                     rrd_set_error
1108                         ("conversion of '%s' to float not complete: tail '%s'",
1109                          updvals[ds_idx + 1], endptr);
1110                     return -1;
1111                 }
1112                 rate = pdp_new[ds_idx] / interval;
1113                 break;
1114             case DST_GAUGE:
1115                 old_locale = setlocale(LC_NUMERIC, NULL);
1116                 setlocale(LC_NUMERIC, "C");
1117                 errno = 0;
1118                 pdp_new[ds_idx] =
1119                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1120                 if (errno) {
1121                     rrd_set_error("converting '%s' to float: %s",
1122                                   updvals[ds_idx + 1], rrd_strerror(errno));
1123                     return -1;
1124                 };
1125                 setlocale(LC_NUMERIC, old_locale);
1126                 if (endptr[0] != '\0') {
1127                     rrd_set_error
1128                         ("conversion of '%s' to float not complete: tail '%s'",
1129                          updvals[ds_idx + 1], endptr);
1130                     return -1;
1131                 }
1132                 rate = pdp_new[ds_idx] / interval;
1133                 break;
1134             default:
1135                 rrd_set_error("rrd contains unknown DS type : '%s'",
1136                               rrd->ds_def[ds_idx].dst);
1137                 return -1;
1138             }
1139             /* break out of this for loop if the error string is set */
1140             if (rrd_test_error()) {
1141                 return -1;
1142             }
1143             /* make sure pdp_temp is neither too large or too small
1144              * if any of these occur it becomes unknown ...
1145              * sorry folks ... */
1146             if (!isnan(rate) &&
1147                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1148                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1149                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1150                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1151                 pdp_new[ds_idx] = DNAN;
1152             }
1153         } else {
1154             /* no news is news all the same */
1155             pdp_new[ds_idx] = DNAN;
1156         }
1157
1158
1159         /* make a copy of the command line argument for the next run */
1160 #ifdef DEBUG
1161         fprintf(stderr, "prep ds[%lu]\t"
1162                 "last_arg '%s'\t"
1163                 "this_arg '%s'\t"
1164                 "pdp_new %10.2f\n",
1165                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1166                 pdp_new[ds_idx]);
1167 #endif
1168         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1169                 LAST_DS_LEN - 1);
1170         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1171     }
1172     return 0;
1173 }
1174
1175 /*
1176  * How many PDP steps have elapsed since the last update? Returns the answer,
1177  * and stores the time between the last update and the last PDP in pre_time,
1178  * and the time between the last PDP and the current time in post_int.
1179  */
1180 static int calculate_elapsed_steps(
1181     rrd_t *rrd,
1182     unsigned long current_time,
1183     unsigned long current_time_usec,
1184     double interval,
1185     double *pre_int,
1186     double *post_int,
1187     unsigned long *proc_pdp_cnt)
1188 {
1189     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1190     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1191                                  * time */
1192     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1193                                  * when it was last updated */
1194     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1195
1196     /* when was the current pdp started */
1197     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1198     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1199
1200     /* when did the last pdp_st occur */
1201     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1202     occu_pdp_st = current_time - occu_pdp_age;
1203
1204     if (occu_pdp_st > proc_pdp_st) {
1205         /* OK we passed the pdp_st moment */
1206         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1207                                                                      * occurred before the latest
1208                                                                      * pdp_st moment*/
1209         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1210         *post_int = occu_pdp_age;   /* how much after it */
1211         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1212     } else {
1213         *pre_int = interval;
1214         *post_int = 0;
1215     }
1216
1217     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1218
1219 #ifdef DEBUG
1220     printf("proc_pdp_age %lu\t"
1221            "proc_pdp_st %lu\t"
1222            "occu_pfp_age %lu\t"
1223            "occu_pdp_st %lu\t"
1224            "int %lf\t"
1225            "pre_int %lf\t"
1226            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1227            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1228 #endif
1229
1230     /* compute the number of elapsed pdp_st moments */
1231     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1232 }
1233
1234 /*
1235  * Increment the PDP values by the values in pdp_new, or else initialize them.
1236  */
1237 static void simple_update(
1238     rrd_t *rrd,
1239     double interval,
1240     rrd_value_t *pdp_new)
1241 {
1242     int       i;
1243
1244     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1245         if (isnan(pdp_new[i])) {
1246             /* this is not really accurate if we use subsecond data arrival time
1247                should have thought of it when going subsecond resolution ...
1248                sorry next format change we will have it! */
1249             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1250                 floor(interval);
1251         } else {
1252             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1253                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1254             } else {
1255                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1256             }
1257         }
1258 #ifdef DEBUG
1259         fprintf(stderr,
1260                 "NO PDP  ds[%i]\t"
1261                 "value %10.2f\t"
1262                 "unkn_sec %5lu\n",
1263                 i,
1264                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1265                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1266 #endif
1267     }
1268 }
1269
1270 /*
1271  * Call process_pdp_st for each DS.
1272  *
1273  * Returns 0 on success, -1 on error.
1274  */
1275 static int process_all_pdp_st(
1276     rrd_t *rrd,
1277     double interval,
1278     double pre_int,
1279     double post_int,
1280     unsigned long elapsed_pdp_st,
1281     rrd_value_t *pdp_new,
1282     rrd_value_t *pdp_temp)
1283 {
1284     unsigned long ds_idx;
1285
1286     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1287        rate*seconds which occurred up to the last run.
1288        pdp_new[] contains rate*seconds from the latest run.
1289        pdp_temp[] will contain the rate for cdp */
1290
1291     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1292         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1293                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1294                            pdp_new, pdp_temp) == -1) {
1295             return -1;
1296         }
1297 #ifdef DEBUG
1298         fprintf(stderr, "PDP UPD ds[%lu]\t"
1299                 "elapsed_pdp_st %lu\t"
1300                 "pdp_temp %10.2f\t"
1301                 "new_prep %10.2f\t"
1302                 "new_unkn_sec %5lu\n",
1303                 ds_idx,
1304                 elapsed_pdp_st,
1305                 pdp_temp[ds_idx],
1306                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1307                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1308 #endif
1309     }
1310     return 0;
1311 }
1312
1313 /*
1314  * Process an update that occurs after one of the PDP moments.
1315  * Increments the PDP value, sets NAN if time greater than the
1316  * heartbeats have elapsed, processes CDEFs.
1317  *
1318  * Returns 0 on success, -1 on error.
1319  */
1320 static int process_pdp_st(
1321     rrd_t *rrd,
1322     unsigned long ds_idx,
1323     double interval,
1324     double pre_int,
1325     double post_int,
1326     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1327     rrd_value_t *pdp_new,
1328     rrd_value_t *pdp_temp)
1329 {
1330     int       i;
1331
1332     /* update pdp_prep to the current pdp_st. */
1333     double    pre_unknown = 0.0;
1334     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1335     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1336
1337     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1338
1339     rpnstack_init(&rpnstack);
1340
1341
1342     if (isnan(pdp_new[ds_idx])) {
1343         /* a final bit of unknown to be added before calculation
1344            we use a temporary variable for this so that we
1345            don't have to turn integer lines before using the value */
1346         pre_unknown = pre_int;
1347     } else {
1348         if (isnan(scratch[PDP_val].u_val)) {
1349             scratch[PDP_val].u_val = 0;
1350         }
1351         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1352     }
1353
1354     /* if too much of the pdp_prep is unknown we dump it */
1355     /* if the interval is larger thatn mrhb we get NAN */
1356     if ((interval > mrhb) ||
1357         (rrd->stat_head->pdp_step / 2.0 <
1358          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1359         pdp_temp[ds_idx] = DNAN;
1360     } else {
1361         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1362             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1363              pre_unknown);
1364     }
1365
1366     /* process CDEF data sources; remember each CDEF DS can
1367      * only reference other DS with a lower index number */
1368     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1369         rpnp_t   *rpnp;
1370
1371         rpnp =
1372             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1373         if(rpnp == NULL) {
1374           rpnstack_free(&rpnstack);
1375           return -1;
1376         }
1377         /* substitute data values for OP_VARIABLE nodes */
1378         for (i = 0; rpnp[i].op != OP_END; i++) {
1379             if (rpnp[i].op == OP_VARIABLE) {
1380                 rpnp[i].op = OP_NUMBER;
1381                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1382             }
1383         }
1384         /* run the rpn calculator */
1385         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1386             free(rpnp);
1387             rpnstack_free(&rpnstack);
1388             return -1;
1389         }
1390         free(rpnp);
1391     }
1392
1393     /* make pdp_prep ready for the next run */
1394     if (isnan(pdp_new[ds_idx])) {
1395         /* this is not realy accurate if we use subsecond data arival time
1396            should have thought of it when going subsecond resolution ...
1397            sorry next format change we will have it! */
1398         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1399         scratch[PDP_val].u_val = DNAN;
1400     } else {
1401         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1402         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1403     }
1404     rpnstack_free(&rpnstack);
1405     return 0;
1406 }
1407
1408 /*
1409  * Iterate over all the RRAs for a given DS and:
1410  * 1. Decide whether to schedule a smooth later
1411  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1412  * 3. Update the CDP
1413  *
1414  * Returns 0 on success, -1 on error
1415  */
1416 static int update_all_cdp_prep(
1417     rrd_t *rrd,
1418     unsigned long *rra_step_cnt,
1419     unsigned long rra_begin,
1420     rrd_file_t *rrd_file,
1421     unsigned long elapsed_pdp_st,
1422     unsigned long proc_pdp_cnt,
1423     rrd_value_t **last_seasonal_coef,
1424     rrd_value_t **seasonal_coef,
1425     rrd_value_t *pdp_temp,
1426     unsigned long *skip_update,
1427     int *schedule_smooth)
1428 {
1429     unsigned long rra_idx;
1430
1431     /* index into the CDP scratch array */
1432     enum cf_en current_cf;
1433     unsigned long rra_start;
1434
1435     /* number of rows to be updated in an RRA for a data value. */
1436     unsigned long start_pdp_offset;
1437
1438     rra_start = rra_begin;
1439     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1440         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1441         start_pdp_offset =
1442             rrd->rra_def[rra_idx].pdp_cnt -
1443             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1444         skip_update[rra_idx] = 0;
1445         if (start_pdp_offset <= elapsed_pdp_st) {
1446             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1447                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1448         } else {
1449             rra_step_cnt[rra_idx] = 0;
1450         }
1451
1452         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1453             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1454              * so that they will be correct for the next observed value; note that for
1455              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1456              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1457             if (rra_step_cnt[rra_idx] > 1) {
1458                 skip_update[rra_idx] = 1;
1459                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1460                                 elapsed_pdp_st, last_seasonal_coef);
1461                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1462                                 elapsed_pdp_st + 1, seasonal_coef);
1463             }
1464             /* periodically run a smoother for seasonal effects */
1465             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1466 #ifdef DEBUG
1467                 fprintf(stderr,
1468                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1469                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1470                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1471                         u_cnt);
1472 #endif
1473                 *schedule_smooth = 1;
1474             }
1475         }
1476         if (rrd_test_error())
1477             return -1;
1478
1479         if (update_cdp_prep
1480             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1481              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1482              current_cf) == -1) {
1483             return -1;
1484         }
1485         rra_start +=
1486             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1487             sizeof(rrd_value_t);
1488     }
1489     return 0;
1490 }
1491
1492 /* 
1493  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1494  */
1495 static int do_schedule_smooth(
1496     rrd_t *rrd,
1497     unsigned long rra_idx,
1498     unsigned long elapsed_pdp_st)
1499 {
1500     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1501     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1502     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1503     unsigned long seasonal_smooth_idx =
1504         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1505     unsigned long *init_seasonal =
1506         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1507
1508     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1509      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1510      * really an RRA level, not a data source within RRA level parameter, but
1511      * the rra_def is read only for rrd_update (not flushed to disk). */
1512     if (*init_seasonal > BURNIN_CYCLES) {
1513         /* someone has no doubt invented a trick to deal with this wrap around,
1514          * but at least this code is clear. */
1515         if (seasonal_smooth_idx > cur_row) {
1516             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1517              * between PDP and CDP */
1518             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1519         }
1520         /* can't rely on negative numbers because we are working with
1521          * unsigned values */
1522         return (cur_row + elapsed_pdp_st >= row_cnt
1523                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1524     }
1525     /* mark off one of the burn-in cycles */
1526     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1527 }
1528
1529 /*
1530  * For a given RRA, iterate over the data sources and call the appropriate
1531  * consolidation function.
1532  *
1533  * Returns 0 on success, -1 on error.
1534  */
1535 static int update_cdp_prep(
1536     rrd_t *rrd,
1537     unsigned long elapsed_pdp_st,
1538     unsigned long start_pdp_offset,
1539     unsigned long *rra_step_cnt,
1540     int rra_idx,
1541     rrd_value_t *pdp_temp,
1542     rrd_value_t *last_seasonal_coef,
1543     rrd_value_t *seasonal_coef,
1544     int current_cf)
1545 {
1546     unsigned long ds_idx, cdp_idx;
1547
1548     /* update CDP_PREP areas */
1549     /* loop over data soures within each RRA */
1550     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1551
1552         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1553
1554         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1555             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1556                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1557                        elapsed_pdp_st, start_pdp_offset,
1558                        rrd->rra_def[rra_idx].pdp_cnt,
1559                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1560                        rra_idx, ds_idx);
1561         } else {
1562             /* Nothing to consolidate if there's one PDP per CDP. However, if
1563              * we've missed some PDPs, let's update null counters etc. */
1564             if (elapsed_pdp_st > 2) {
1565                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1566                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1567                           (enum cf_en)current_cf);
1568             }
1569         }
1570
1571         if (rrd_test_error())
1572             return -1;
1573     }                   /* endif data sources loop */
1574     return 0;
1575 }
1576
1577 /*
1578  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1579  * primary value, secondary value, and # of unknowns.
1580  */
1581 static void update_cdp(
1582     unival *scratch,
1583     int current_cf,
1584     rrd_value_t pdp_temp_val,
1585     unsigned long rra_step_cnt,
1586     unsigned long elapsed_pdp_st,
1587     unsigned long start_pdp_offset,
1588     unsigned long pdp_cnt,
1589     rrd_value_t xff,
1590     int i,
1591     int ii)
1592 {
1593     /* shorthand variables */
1594     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1595     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1596     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1597     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1598
1599     if (rra_step_cnt) {
1600         /* If we are in this block, as least 1 CDP value will be written to
1601          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1602          * to be written, then the "fill in" value is the CDP_secondary_val
1603          * entry. */
1604         if (isnan(pdp_temp_val)) {
1605             *cdp_unkn_pdp_cnt += start_pdp_offset;
1606             *cdp_secondary_val = DNAN;
1607         } else {
1608             /* CDP_secondary value is the RRA "fill in" value for intermediary
1609              * CDP data entries. No matter the CF, the value is the same because
1610              * the average, max, min, and last of a list of identical values is
1611              * the same, namely, the value itself. */
1612             *cdp_secondary_val = pdp_temp_val;
1613         }
1614
1615         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1616             *cdp_primary_val = DNAN;
1617         } else {
1618             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1619                                start_pdp_offset, pdp_cnt);
1620         }
1621         *cdp_val =
1622             initialize_carry_over(pdp_temp_val,current_cf,
1623                                   elapsed_pdp_st,
1624                                   start_pdp_offset, pdp_cnt);
1625                /* endif meets xff value requirement for a valid value */
1626         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1627          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1628         if (isnan(pdp_temp_val))
1629             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1630         else
1631             *cdp_unkn_pdp_cnt = 0;
1632     } else {            /* rra_step_cnt[i]  == 0 */
1633
1634 #ifdef DEBUG
1635         if (isnan(*cdp_val)) {
1636             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1637                     i, ii);
1638         } else {
1639             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1640                     i, ii, *cdp_val);
1641         }
1642 #endif
1643         if (isnan(pdp_temp_val)) {
1644             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1645         } else {
1646             *cdp_val =
1647                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1648                                   current_cf, i, ii);
1649         }
1650     }
1651 }
1652
1653 /*
1654  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1655  * on the type of consolidation function.
1656  */
1657 static void initialize_cdp_val(
1658     unival *scratch,
1659     int current_cf,
1660     rrd_value_t pdp_temp_val,
1661     unsigned long start_pdp_offset,
1662     unsigned long pdp_cnt)
1663 {
1664     rrd_value_t cum_val, cur_val;
1665
1666     switch (current_cf) {
1667     case CF_AVERAGE:
1668         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1669         cur_val = IFDNAN(pdp_temp_val, 0.0);
1670         scratch[CDP_primary_val].u_val =
1671             (cum_val + cur_val * start_pdp_offset) /
1672             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1673         break;
1674     case CF_MAXIMUM: 
1675         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1676         cur_val = IFDNAN(pdp_temp_val, -DINF);
1677
1678 #if 0
1679 #ifdef DEBUG
1680         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1681             fprintf(stderr,
1682                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1683                     i, ii);
1684             exit(-1);
1685         }
1686 #endif
1687 #endif
1688         if (cur_val > cum_val)
1689             scratch[CDP_primary_val].u_val = cur_val;
1690         else
1691             scratch[CDP_primary_val].u_val = cum_val;
1692         break;
1693     case CF_MINIMUM:
1694         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1695         cur_val = IFDNAN(pdp_temp_val, DINF);
1696 #if 0
1697 #ifdef DEBUG
1698         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1699             fprintf(stderr,
1700                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1701                     ii);
1702             exit(-1);
1703         }
1704 #endif
1705 #endif
1706         if (cur_val < cum_val)
1707             scratch[CDP_primary_val].u_val = cur_val;
1708         else
1709             scratch[CDP_primary_val].u_val = cum_val;
1710         break;
1711     case CF_LAST:
1712     default:
1713         scratch[CDP_primary_val].u_val = pdp_temp_val;
1714         break;
1715     }
1716 }
1717
1718 /*
1719  * Update the consolidation function for Holt-Winters functions as
1720  * well as other functions that don't actually consolidate multiple
1721  * PDPs.
1722  */
1723 static void reset_cdp(
1724     rrd_t *rrd,
1725     unsigned long elapsed_pdp_st,
1726     rrd_value_t *pdp_temp,
1727     rrd_value_t *last_seasonal_coef,
1728     rrd_value_t *seasonal_coef,
1729     int rra_idx,
1730     int ds_idx,
1731     int cdp_idx,
1732     enum cf_en current_cf)
1733 {
1734     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1735
1736     switch (current_cf) {
1737     case CF_AVERAGE:
1738     default:
1739         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1740         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1741         break;
1742     case CF_SEASONAL:
1743     case CF_DEVSEASONAL:
1744         /* need to update cached seasonal values, so they are consistent
1745          * with the bulk update */
1746         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1747          * CDP_last_deviation are the same. */
1748         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1749         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1750         break;
1751     case CF_HWPREDICT:
1752     case CF_MHWPREDICT:
1753         /* need to update the null_count and last_null_count.
1754          * even do this for non-DNAN pdp_temp because the
1755          * algorithm is not learning from batch updates. */
1756         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1757         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1758         /* fall through */
1759     case CF_DEVPREDICT:
1760         scratch[CDP_primary_val].u_val = DNAN;
1761         scratch[CDP_secondary_val].u_val = DNAN;
1762         break;
1763     case CF_FAILURES:
1764         /* do not count missed bulk values as failures */
1765         scratch[CDP_primary_val].u_val = 0;
1766         scratch[CDP_secondary_val].u_val = 0;
1767         /* need to reset violations buffer.
1768          * could do this more carefully, but for now, just
1769          * assume a bulk update wipes away all violations. */
1770         erase_violations(rrd, cdp_idx, rra_idx);
1771         break;
1772     }
1773 }
1774
1775 static rrd_value_t initialize_carry_over(
1776     rrd_value_t pdp_temp_val,
1777     int current_cf,
1778     unsigned long elapsed_pdp_st,
1779     unsigned long start_pdp_offset,
1780     unsigned long pdp_cnt)
1781 {
1782     unsigned long pdp_into_cdp_cnt = ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1783     if ( pdp_into_cdp_cnt == 0 || isnan(pdp_temp_val)){
1784         switch (current_cf) {
1785         case CF_MAXIMUM:
1786             return -DINF;
1787         case CF_MINIMUM:
1788             return DINF;
1789         case CF_AVERAGE:
1790             return 0;
1791         default:
1792             return DNAN;
1793         }        
1794     } 
1795     else {
1796         switch (current_cf) {
1797         case CF_AVERAGE:
1798             return pdp_temp_val *  pdp_into_cdp_cnt ;
1799         default:
1800             return pdp_temp_val;
1801         }        
1802     }        
1803 }
1804
1805 /*
1806  * Update or initialize a CDP value based on the consolidation
1807  * function.
1808  *
1809  * Returns the new value.
1810  */
1811 static rrd_value_t calculate_cdp_val(
1812     rrd_value_t cdp_val,
1813     rrd_value_t pdp_temp_val,
1814     unsigned long elapsed_pdp_st,
1815     int current_cf,
1816 #ifdef DEBUG
1817     int i,
1818     int ii
1819 #else
1820     int UNUSED(i),
1821     int UNUSED(ii)
1822 #endif
1823     )
1824 {
1825     if (isnan(cdp_val)) {
1826         if (current_cf == CF_AVERAGE) {
1827             pdp_temp_val *= elapsed_pdp_st;
1828         }
1829 #ifdef DEBUG
1830         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1831                 i, ii, pdp_temp_val);
1832 #endif
1833         return pdp_temp_val;
1834     }
1835     if (current_cf == CF_AVERAGE)
1836         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1837     if (current_cf == CF_MINIMUM)
1838         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1839     if (current_cf == CF_MAXIMUM)
1840         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1841
1842     return pdp_temp_val;
1843 }
1844
1845 /*
1846  * For each RRA, update the seasonal values and then call update_aberrant_CF
1847  * for each data source.
1848  *
1849  * Return 0 on success, -1 on error.
1850  */
1851 static int update_aberrant_cdps(
1852     rrd_t *rrd,
1853     rrd_file_t *rrd_file,
1854     unsigned long rra_begin,
1855     unsigned long elapsed_pdp_st,
1856     rrd_value_t *pdp_temp,
1857     rrd_value_t **seasonal_coef)
1858 {
1859     unsigned long rra_idx, ds_idx, j;
1860
1861     /* number of PDP steps since the last update that
1862      * are assigned to the first CDP to be generated
1863      * since the last update. */
1864     unsigned short scratch_idx;
1865     unsigned long rra_start;
1866     enum cf_en current_cf;
1867
1868     /* this loop is only entered if elapsed_pdp_st < 3 */
1869     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1870          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1871         rra_start = rra_begin;
1872         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1873             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1874                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1875                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1876                     if (scratch_idx == CDP_primary_val) {
1877                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1878                                         elapsed_pdp_st + 1, seasonal_coef);
1879                     } else {
1880                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1881                                         elapsed_pdp_st + 2, seasonal_coef);
1882                     }
1883                 }
1884                 if (rrd_test_error())
1885                     return -1;
1886                 /* loop over data soures within each RRA */
1887                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1888                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1889                                        rra_idx * (rrd->stat_head->ds_cnt) +
1890                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1891                                        *seasonal_coef);
1892                 }
1893             }
1894             rra_start += rrd->rra_def[rra_idx].row_cnt
1895                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1896         }
1897     }
1898     return 0;
1899 }
1900
1901 /* 
1902  * Move sequentially through the file, writing one RRA at a time.  Note this
1903  * architecture divorces the computation of CDP with flushing updated RRA
1904  * entries to disk.
1905  *
1906  * Return 0 on success, -1 on error.
1907  */
1908 static int write_to_rras(
1909     rrd_t *rrd,
1910     rrd_file_t *rrd_file,
1911     unsigned long *rra_step_cnt,
1912     unsigned long rra_begin,
1913     time_t current_time,
1914     unsigned long *skip_update,
1915     rrd_info_t ** pcdp_summary)
1916 {
1917     unsigned long rra_idx;
1918     unsigned long rra_start;
1919     time_t    rra_time = 0; /* time of update for a RRA */
1920
1921     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1922     
1923     /* Ready to write to disk */
1924     rra_start = rra_begin;
1925
1926     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1927         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1928         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1929
1930         /* for cdp_prep */
1931         unsigned short scratch_idx;
1932         unsigned long step_subtract;
1933
1934         for (scratch_idx = CDP_primary_val,
1935                  step_subtract = 1;
1936              rra_step_cnt[rra_idx] > 0;
1937              rra_step_cnt[rra_idx]--,
1938                  scratch_idx = CDP_secondary_val,
1939                  step_subtract = 2) {
1940
1941             size_t rra_pos_new;
1942 #ifdef DEBUG
1943             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1944 #endif
1945             /* increment, with wrap-around */
1946             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1947               rra_ptr->cur_row = 0;
1948
1949             /* we know what our position should be */
1950             rra_pos_new = rra_start
1951               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1952
1953             /* re-seek if the position is wrong or we wrapped around */
1954             if ((size_t)rra_pos_new != rrd_file->pos) {
1955                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1956                     rrd_set_error("seek error in rrd");
1957                     return -1;
1958                 }
1959             }
1960 #ifdef DEBUG
1961             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1962 #endif
1963
1964             if (skip_update[rra_idx])
1965                 continue;
1966
1967             if (*pcdp_summary != NULL) {
1968                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1969
1970                 rra_time = (current_time - current_time % step_time)
1971                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1972             }
1973
1974             if (write_RRA_row
1975                 (rrd_file, rrd, rra_idx, scratch_idx,
1976                  pcdp_summary, rra_time) == -1)
1977                 return -1;
1978
1979             rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1980         }
1981
1982         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1983     } /* RRA LOOP */
1984
1985     return 0;
1986 }
1987
1988 /*
1989  * Write out one row of values (one value per DS) to the archive.
1990  *
1991  * Returns 0 on success, -1 on error.
1992  */
1993 static int write_RRA_row(
1994     rrd_file_t *rrd_file,
1995     rrd_t *rrd,
1996     unsigned long rra_idx,
1997     unsigned short CDP_scratch_idx,
1998     rrd_info_t ** pcdp_summary,
1999     time_t rra_time)
2000 {
2001     unsigned long ds_idx, cdp_idx;
2002     rrd_infoval_t iv;
2003
2004     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2005         /* compute the cdp index */
2006         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2007 #ifdef DEBUG
2008         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2009                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2010                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2011 #endif
2012         if (*pcdp_summary != NULL) {
2013             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2014             /* append info to the return hash */
2015             *pcdp_summary = rrd_info_push(*pcdp_summary,
2016                                           sprintf_alloc
2017                                           ("[%lli]RRA[%s][%lu]DS[%s]", 
2018                                            (long long)rra_time,
2019                                            rrd->rra_def[rra_idx].cf_nam,
2020                                            rrd->rra_def[rra_idx].pdp_cnt,
2021                                            rrd->ds_def[ds_idx].ds_nam),
2022                                            RD_I_VAL, iv);
2023         }
2024         errno = 0;
2025         if (rrd_write(rrd_file,
2026                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2027                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2028             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2029             return -1;
2030         }
2031     }
2032     return 0;
2033 }
2034
2035 /*
2036  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2037  *
2038  * Returns 0 on success, -1 otherwise
2039  */
2040 static int smooth_all_rras(
2041     rrd_t *rrd,
2042     rrd_file_t *rrd_file,
2043     unsigned long rra_begin)
2044 {
2045     unsigned long rra_start = rra_begin;
2046     unsigned long rra_idx;
2047
2048     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2049         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2050             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2051 #ifdef DEBUG
2052             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2053 #endif
2054             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2055             if (rrd_test_error())
2056                 return -1;
2057         }
2058         rra_start += rrd->rra_def[rra_idx].row_cnt
2059             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2060     }
2061     return 0;
2062 }
2063
2064 #ifndef HAVE_MMAP
2065 /*
2066  * Flush changes to disk (unless we're using mmap)
2067  *
2068  * Returns 0 on success, -1 otherwise
2069  */
2070 static int write_changes_to_disk(
2071     rrd_t *rrd,
2072     rrd_file_t *rrd_file,
2073     int version)
2074 {
2075     /* we just need to write back the live header portion now */
2076     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2077                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2078                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2079                  SEEK_SET) != 0) {
2080         rrd_set_error("seek rrd for live header writeback");
2081         return -1;
2082     }
2083     if (version >= 3) {
2084         if (rrd_write(rrd_file, rrd->live_head,
2085                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2086             rrd_set_error("rrd_write live_head to rrd");
2087             return -1;
2088         }
2089     } else {
2090         if (rrd_write(rrd_file, rrd->legacy_last_up,
2091                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2092             rrd_set_error("rrd_write live_head to rrd");
2093             return -1;
2094         }
2095     }
2096
2097
2098     if (rrd_write(rrd_file, rrd->pdp_prep,
2099                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2100         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2101         rrd_set_error("rrd_write pdp_prep to rrd");
2102         return -1;
2103     }
2104
2105     if (rrd_write(rrd_file, rrd->cdp_prep,
2106                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2107                   rrd->stat_head->ds_cnt)
2108         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2109                       rrd->stat_head->ds_cnt)) {
2110
2111         rrd_set_error("rrd_write cdp_prep to rrd");
2112         return -1;
2113     }
2114
2115     if (rrd_write(rrd_file, rrd->rra_ptr,
2116                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2117         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2118         rrd_set_error("rrd_write rra_ptr to rrd");
2119         return -1;
2120     }
2121     return 0;
2122 }
2123 #endif