prepare for the release of rrdtool-1.4.1
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.4.1  Copyright by Tobi Oetiker, 1997-2009
3  *                Copyright by Florian Forster, 2008
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
9
10 #include "rrd_tool.h"
11
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
17
18 #include <locale.h>
19
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
22
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
25
26 #include "rrd_client.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37     time_t    tv_sec;   /* seconds */
38     long      tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43     int       tz_minuteswest;   /* minutes W of Greenwich */
44     int       tz_dsttime;   /* type of dst correction */
45 };
46
47 static int gettimeofday(
48     struct timeval *t,
49     struct __timezone *tz)
50 {
51
52     struct _timeb current_time;
53
54     _ftime(&current_time);
55
56     t->tv_sec = current_time.time;
57     t->tv_usec = current_time.millitm * 1000;
58
59     return 0;
60 }
61
62 #endif
63
64 /* FUNCTION PROTOTYPES */
65
66 int       rrd_update_r(
67     const char *filename,
68     const char *tmplt,
69     int argc,
70     const char **argv);
71 int       _rrd_update(
72     const char *filename,
73     const char *tmplt,
74     int argc,
75     const char **argv,
76     rrd_info_t *);
77
78 static int allocate_data_structures(
79     rrd_t *rrd,
80     char ***updvals,
81     rrd_value_t **pdp_temp,
82     const char *tmplt,
83     long **tmpl_idx,
84     unsigned long *tmpl_cnt,
85     unsigned long **rra_step_cnt,
86     unsigned long **skip_update,
87     rrd_value_t **pdp_new);
88
89 static int parse_template(
90     rrd_t *rrd,
91     const char *tmplt,
92     unsigned long *tmpl_cnt,
93     long *tmpl_idx);
94
95 static int process_arg(
96     char *step_start,
97     rrd_t *rrd,
98     rrd_file_t *rrd_file,
99     unsigned long rra_begin,
100     time_t *current_time,
101     unsigned long *current_time_usec,
102     rrd_value_t *pdp_temp,
103     rrd_value_t *pdp_new,
104     unsigned long *rra_step_cnt,
105     char **updvals,
106     long *tmpl_idx,
107     unsigned long tmpl_cnt,
108     rrd_info_t ** pcdp_summary,
109     int version,
110     unsigned long *skip_update,
111     int *schedule_smooth);
112
113 static int parse_ds(
114     rrd_t *rrd,
115     char **updvals,
116     long *tmpl_idx,
117     char *input,
118     unsigned long tmpl_cnt,
119     time_t *current_time,
120     unsigned long *current_time_usec,
121     int version);
122
123 static int get_time_from_reading(
124     rrd_t *rrd,
125     char timesyntax,
126     char **updvals,
127     time_t *current_time,
128     unsigned long *current_time_usec,
129     int version);
130
131 static int update_pdp_prep(
132     rrd_t *rrd,
133     char **updvals,
134     rrd_value_t *pdp_new,
135     double interval);
136
137 static int calculate_elapsed_steps(
138     rrd_t *rrd,
139     unsigned long current_time,
140     unsigned long current_time_usec,
141     double interval,
142     double *pre_int,
143     double *post_int,
144     unsigned long *proc_pdp_cnt);
145
146 static void simple_update(
147     rrd_t *rrd,
148     double interval,
149     rrd_value_t *pdp_new);
150
151 static int process_all_pdp_st(
152     rrd_t *rrd,
153     double interval,
154     double pre_int,
155     double post_int,
156     unsigned long elapsed_pdp_st,
157     rrd_value_t *pdp_new,
158     rrd_value_t *pdp_temp);
159
160 static int process_pdp_st(
161     rrd_t *rrd,
162     unsigned long ds_idx,
163     double interval,
164     double pre_int,
165     double post_int,
166     long diff_pdp_st,
167     rrd_value_t *pdp_new,
168     rrd_value_t *pdp_temp);
169
170 static int update_all_cdp_prep(
171     rrd_t *rrd,
172     unsigned long *rra_step_cnt,
173     unsigned long rra_begin,
174     rrd_file_t *rrd_file,
175     unsigned long elapsed_pdp_st,
176     unsigned long proc_pdp_cnt,
177     rrd_value_t **last_seasonal_coef,
178     rrd_value_t **seasonal_coef,
179     rrd_value_t *pdp_temp,
180     unsigned long *skip_update,
181     int *schedule_smooth);
182
183 static int do_schedule_smooth(
184     rrd_t *rrd,
185     unsigned long rra_idx,
186     unsigned long elapsed_pdp_st);
187
188 static int update_cdp_prep(
189     rrd_t *rrd,
190     unsigned long elapsed_pdp_st,
191     unsigned long start_pdp_offset,
192     unsigned long *rra_step_cnt,
193     int rra_idx,
194     rrd_value_t *pdp_temp,
195     rrd_value_t *last_seasonal_coef,
196     rrd_value_t *seasonal_coef,
197     int current_cf);
198
199 static void update_cdp(
200     unival *scratch,
201     int current_cf,
202     rrd_value_t pdp_temp_val,
203     unsigned long rra_step_cnt,
204     unsigned long elapsed_pdp_st,
205     unsigned long start_pdp_offset,
206     unsigned long pdp_cnt,
207     rrd_value_t xff,
208     int i,
209     int ii);
210
211 static void initialize_cdp_val(
212     unival *scratch,
213     int current_cf,
214     rrd_value_t pdp_temp_val,
215     unsigned long elapsed_pdp_st,
216     unsigned long start_pdp_offset,
217     unsigned long pdp_cnt);
218
219 static void reset_cdp(
220     rrd_t *rrd,
221     unsigned long elapsed_pdp_st,
222     rrd_value_t *pdp_temp,
223     rrd_value_t *last_seasonal_coef,
224     rrd_value_t *seasonal_coef,
225     int rra_idx,
226     int ds_idx,
227     int cdp_idx,
228     enum cf_en current_cf);
229
230 static rrd_value_t initialize_average_carry_over(
231     rrd_value_t pdp_temp_val,
232     unsigned long elapsed_pdp_st,
233     unsigned long start_pdp_offset,
234     unsigned long pdp_cnt);
235
236 static rrd_value_t calculate_cdp_val(
237     rrd_value_t cdp_val,
238     rrd_value_t pdp_temp_val,
239     unsigned long elapsed_pdp_st,
240     int current_cf,
241     int i,
242     int ii);
243
244 static int update_aberrant_cdps(
245     rrd_t *rrd,
246     rrd_file_t *rrd_file,
247     unsigned long rra_begin,
248     unsigned long elapsed_pdp_st,
249     rrd_value_t *pdp_temp,
250     rrd_value_t **seasonal_coef);
251
252 static int write_to_rras(
253     rrd_t *rrd,
254     rrd_file_t *rrd_file,
255     unsigned long *rra_step_cnt,
256     unsigned long rra_begin,
257     time_t current_time,
258     unsigned long *skip_update,
259     rrd_info_t ** pcdp_summary);
260
261 static int write_RRA_row(
262     rrd_file_t *rrd_file,
263     rrd_t *rrd,
264     unsigned long rra_idx,
265     unsigned short CDP_scratch_idx,
266     rrd_info_t ** pcdp_summary,
267     time_t rra_time);
268
269 static int smooth_all_rras(
270     rrd_t *rrd,
271     rrd_file_t *rrd_file,
272     unsigned long rra_begin);
273
274 #ifndef HAVE_MMAP
275 static int write_changes_to_disk(
276     rrd_t *rrd,
277     rrd_file_t *rrd_file,
278     int version);
279 #endif
280
281 /*
282  * normalize time as returned by gettimeofday. usec part must
283  * be always >= 0
284  */
285 static void normalize_time(
286     struct timeval *t)
287 {
288     if (t->tv_usec < 0) {
289         t->tv_sec--;
290         t->tv_usec += 1e6L;
291     }
292 }
293
294 /*
295  * Sets current_time and current_time_usec based on the current time.
296  * current_time_usec is set to 0 if the version number is 1 or 2.
297  */
298 static void initialize_time(
299     time_t *current_time,
300     unsigned long *current_time_usec,
301     int version)
302 {
303     struct timeval tmp_time;    /* used for time conversion */
304
305     gettimeofday(&tmp_time, 0);
306     normalize_time(&tmp_time);
307     *current_time = tmp_time.tv_sec;
308     if (version >= 3) {
309         *current_time_usec = tmp_time.tv_usec;
310     } else {
311         *current_time_usec = 0;
312     }
313 }
314
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
316
317 rrd_info_t *rrd_update_v(
318     int argc,
319     char **argv)
320 {
321     char     *tmplt = NULL;
322     rrd_info_t *result = NULL;
323     rrd_infoval_t rc;
324     char *opt_daemon = NULL;
325     struct option long_options[] = {
326         {"template", required_argument, 0, 't'},
327         {0, 0, 0, 0}
328     };
329
330     rc.u_int = -1;
331     optind = 0;
332     opterr = 0;         /* initialize getopt */
333
334     while (1) {
335         int       option_index = 0;
336         int       opt;
337
338         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
339
340         if (opt == EOF)
341             break;
342
343         switch (opt) {
344         case 't':
345             tmplt = optarg;
346             break;
347
348         case '?':
349             rrd_set_error("unknown option '%s'", argv[optind - 1]);
350             goto end_tag;
351         }
352     }
353
354     opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355     if (opt_daemon != NULL) {
356         rrd_set_error ("The \"%s\" environment variable is defined, "
357                 "but \"%s\" cannot work with rrdcached. Either unset "
358                 "the environment variable or use \"update\" instead.",
359                 ENV_RRDCACHED_ADDRESS, argv[0]);
360         goto end_tag;
361     }
362
363     /* need at least 2 arguments: filename, data. */
364     if (argc - optind < 2) {
365         rrd_set_error("Not enough arguments");
366         goto end_tag;
367     }
368     rc.u_int = 0;
369     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370     rc.u_int = _rrd_update(argv[optind], tmplt,
371                            argc - optind - 1,
372                            (const char **) (argv + optind + 1), result);
373     result->value.u_int = rc.u_int;
374   end_tag:
375     return result;
376 }
377
378 int rrd_update(
379     int argc,
380     char **argv)
381 {
382     struct option long_options[] = {
383         {"template", required_argument, 0, 't'},
384         {"daemon",   required_argument, 0, 'd'},
385         {0, 0, 0, 0}
386     };
387     int       option_index = 0;
388     int       opt;
389     char     *tmplt = NULL;
390     int       rc = -1;
391     char     *opt_daemon = NULL;
392
393     optind = 0;
394     opterr = 0;         /* initialize getopt */
395
396     while (1) {
397         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
398
399         if (opt == EOF)
400             break;
401
402         switch (opt) {
403         case 't':
404             tmplt = strdup(optarg);
405             break;
406
407         case 'd':
408             if (opt_daemon != NULL)
409                 free (opt_daemon);
410             opt_daemon = strdup (optarg);
411             if (opt_daemon == NULL)
412             {
413                 rrd_set_error("strdup failed.");
414                 goto out;
415             }
416             break;
417
418         case '?':
419             rrd_set_error("unknown option '%s'", argv[optind - 1]);
420             goto out;
421         }
422     }
423
424     /* need at least 2 arguments: filename, data. */
425     if (argc - optind < 2) {
426         rrd_set_error("Not enough arguments");
427         goto out;
428     }
429
430     {   /* try to connect to rrdcached */
431         int status = rrdc_connect(opt_daemon);
432         if (status != 0) return status;
433     }
434
435     if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
436     {
437         rrd_set_error("The caching daemon cannot be used together with "
438                 "templates yet.");
439         goto out;
440     }
441
442     if (! rrdc_is_connected(opt_daemon))
443     {
444       rc = rrd_update_r(argv[optind], tmplt,
445                         argc - optind - 1, (const char **) (argv + optind + 1));
446     }
447     else /* we are connected */
448     {
449         rc = rrdc_update (argv[optind], /* file */
450                           argc - optind - 1, /* values_num */
451                           (const char *const *) (argv + optind + 1)); /* values */
452         if (rc > 0)
453             rrd_set_error("Failed sending the values to rrdcached: %s",
454                           rrd_strerror (rc));
455     }
456
457   out:
458     if (tmplt != NULL)
459     {
460         free(tmplt);
461         tmplt = NULL;
462     }
463     if (opt_daemon != NULL)
464     {
465         free (opt_daemon);
466         opt_daemon = NULL;
467     }
468     return rc;
469 }
470
471 int rrd_update_r(
472     const char *filename,
473     const char *tmplt,
474     int argc,
475     const char **argv)
476 {
477     return _rrd_update(filename, tmplt, argc, argv, NULL);
478 }
479
480 int _rrd_update(
481     const char *filename,
482     const char *tmplt,
483     int argc,
484     const char **argv,
485     rrd_info_t * pcdp_summary)
486 {
487
488     int       arg_i = 2;
489
490     unsigned long rra_begin;    /* byte pointer to the rra
491                                  * area in the rrd file.  this
492                                  * pointer never changes value */
493     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
494                              * to the existing entry */
495     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
496                              * to the cdp values */
497
498     long     *tmpl_idx; /* index representing the settings
499                          * transported by the tmplt index */
500     unsigned long tmpl_cnt = 2; /* time and data */
501     rrd_t     rrd;
502     time_t    current_time = 0;
503     unsigned long current_time_usec = 0;    /* microseconds part of current time */
504     char    **updvals;
505     int       schedule_smooth = 0;
506
507     /* number of elapsed PDP steps since last update */
508     unsigned long *rra_step_cnt = NULL;
509
510     int       version;  /* rrd version */
511     rrd_file_t *rrd_file;
512     char     *arg_copy; /* for processing the argv */
513     unsigned long *skip_update; /* RRAs to advance but not write */
514
515     /* need at least 1 arguments: data. */
516     if (argc < 1) {
517         rrd_set_error("Not enough arguments");
518         goto err_out;
519     }
520
521     rrd_init(&rrd);
522     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
523         goto err_free;
524     }
525     /* We are now at the beginning of the rra's */
526     rra_begin = rrd_file->header_len;
527
528     version = atoi(rrd.stat_head->version);
529
530     initialize_time(&current_time, &current_time_usec, version);
531
532     /* get exclusive lock to whole file.
533      * lock gets removed when we close the file.
534      */
535     if (rrd_lock(rrd_file) != 0) {
536         rrd_set_error("could not lock RRD");
537         goto err_close;
538     }
539
540     if (allocate_data_structures(&rrd, &updvals,
541                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
542                                  &rra_step_cnt, &skip_update,
543                                  &pdp_new) == -1) {
544         goto err_close;
545     }
546
547     /* loop through the arguments. */
548     for (arg_i = 0; arg_i < argc; arg_i++) {
549         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
550             rrd_set_error("failed duplication argv entry");
551             break;
552         }
553         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
554                         &current_time, &current_time_usec, pdp_temp, pdp_new,
555                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
556                         &pcdp_summary, version, skip_update,
557                         &schedule_smooth) == -1) {
558             if (rrd_test_error()) { /* Should have error string always here */
559                 char     *save_error;
560
561                 /* Prepend file name to error message */
562                 if ((save_error = strdup(rrd_get_error())) != NULL) {
563                     rrd_set_error("%s: %s", filename, save_error);
564                     free(save_error);
565                 }
566             }
567             free(arg_copy);
568             break;
569         }
570         free(arg_copy);
571     }
572
573     free(rra_step_cnt);
574
575     /* if we got here and if there is an error and if the file has not been
576      * written to, then close things up and return. */
577     if (rrd_test_error()) {
578         goto err_free_structures;
579     }
580 #ifndef HAVE_MMAP
581     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
582         goto err_free_structures;
583     }
584 #endif
585
586     /* calling the smoothing code here guarantees at most one smoothing
587      * operation per rrd_update call. Unfortunately, it is possible with bulk
588      * updates, or a long-delayed update for smoothing to occur off-schedule.
589      * This really isn't critical except during the burn-in cycles. */
590     if (schedule_smooth) {
591         smooth_all_rras(&rrd, rrd_file, rra_begin);
592     }
593
594 /*    rrd_dontneed(rrd_file,&rrd); */
595     rrd_free(&rrd);
596     rrd_close(rrd_file);
597
598     free(pdp_new);
599     free(tmpl_idx);
600     free(pdp_temp);
601     free(skip_update);
602     free(updvals);
603     return 0;
604
605   err_free_structures:
606     free(pdp_new);
607     free(tmpl_idx);
608     free(pdp_temp);
609     free(skip_update);
610     free(updvals);
611   err_close:
612     rrd_close(rrd_file);
613   err_free:
614     rrd_free(&rrd);
615   err_out:
616     return -1;
617 }
618
619 /*
620  * Allocate some important arrays used, and initialize the template.
621  *
622  * When it returns, either all of the structures are allocated
623  * or none of them are.
624  *
625  * Returns 0 on success, -1 on error.
626  */
627 static int allocate_data_structures(
628     rrd_t *rrd,
629     char ***updvals,
630     rrd_value_t **pdp_temp,
631     const char *tmplt,
632     long **tmpl_idx,
633     unsigned long *tmpl_cnt,
634     unsigned long **rra_step_cnt,
635     unsigned long **skip_update,
636     rrd_value_t **pdp_new)
637 {
638     unsigned  i, ii;
639     if ((*updvals = (char **) malloc(sizeof(char *)
640                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
641         rrd_set_error("allocating updvals pointer array.");
642         return -1;
643     }
644     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
645                                             * rrd->stat_head->ds_cnt)) ==
646         NULL) {
647         rrd_set_error("allocating pdp_temp.");
648         goto err_free_updvals;
649     }
650     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
651                                                  *
652                                                  rrd->stat_head->rra_cnt)) ==
653         NULL) {
654         rrd_set_error("allocating skip_update.");
655         goto err_free_pdp_temp;
656     }
657     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
658                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
659         rrd_set_error("allocating tmpl_idx.");
660         goto err_free_skip_update;
661     }
662     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
663                                                   *
664                                                   (rrd->stat_head->
665                                                    rra_cnt))) == NULL) {
666         rrd_set_error("allocating rra_step_cnt.");
667         goto err_free_tmpl_idx;
668     }
669
670     /* initialize tmplt redirector */
671     /* default config example (assume DS 1 is a CDEF DS)
672        tmpl_idx[0] -> 0; (time)
673        tmpl_idx[1] -> 1; (DS 0)
674        tmpl_idx[2] -> 3; (DS 2)
675        tmpl_idx[3] -> 4; (DS 3) */
676     (*tmpl_idx)[0] = 0; /* time */
677     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
678         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
679             (*tmpl_idx)[ii++] = i;
680     }
681     *tmpl_cnt = ii;
682
683     if (tmplt != NULL) {
684         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
685             goto err_free_rra_step_cnt;
686         }
687     }
688
689     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
690                                            * rrd->stat_head->ds_cnt)) == NULL) {
691         rrd_set_error("allocating pdp_new.");
692         goto err_free_rra_step_cnt;
693     }
694
695     return 0;
696
697   err_free_rra_step_cnt:
698     free(*rra_step_cnt);
699   err_free_tmpl_idx:
700     free(*tmpl_idx);
701   err_free_skip_update:
702     free(*skip_update);
703   err_free_pdp_temp:
704     free(*pdp_temp);
705   err_free_updvals:
706     free(*updvals);
707     return -1;
708 }
709
710 /*
711  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
712  *
713  * Returns 0 on success.
714  */
715 static int parse_template(
716     rrd_t *rrd,
717     const char *tmplt,
718     unsigned long *tmpl_cnt,
719     long *tmpl_idx)
720 {
721     char     *dsname, *tmplt_copy;
722     unsigned int tmpl_len, i;
723     int       ret = 0;
724
725     *tmpl_cnt = 1;      /* the first entry is the time */
726
727     /* we should work on a writeable copy here */
728     if ((tmplt_copy = strdup(tmplt)) == NULL) {
729         rrd_set_error("error copying tmplt '%s'", tmplt);
730         ret = -1;
731         goto out;
732     }
733
734     dsname = tmplt_copy;
735     tmpl_len = strlen(tmplt_copy);
736     for (i = 0; i <= tmpl_len; i++) {
737         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
738             tmplt_copy[i] = '\0';
739             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
740                 rrd_set_error("tmplt contains more DS definitions than RRD");
741                 ret = -1;
742                 goto out_free_tmpl_copy;
743             }
744             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
745                 rrd_set_error("unknown DS name '%s'", dsname);
746                 ret = -1;
747                 goto out_free_tmpl_copy;
748             }
749             /* go to the next entry on the tmplt_copy */
750             if (i < tmpl_len)
751                 dsname = &tmplt_copy[i + 1];
752         }
753     }
754   out_free_tmpl_copy:
755     free(tmplt_copy);
756   out:
757     return ret;
758 }
759
760 /*
761  * Parse an update string, updates the primary data points (PDPs)
762  * and consolidated data points (CDPs), and writes changes to the RRAs.
763  *
764  * Returns 0 on success, -1 on error.
765  */
766 static int process_arg(
767     char *step_start,
768     rrd_t *rrd,
769     rrd_file_t *rrd_file,
770     unsigned long rra_begin,
771     time_t *current_time,
772     unsigned long *current_time_usec,
773     rrd_value_t *pdp_temp,
774     rrd_value_t *pdp_new,
775     unsigned long *rra_step_cnt,
776     char **updvals,
777     long *tmpl_idx,
778     unsigned long tmpl_cnt,
779     rrd_info_t ** pcdp_summary,
780     int version,
781     unsigned long *skip_update,
782     int *schedule_smooth)
783 {
784     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
785
786     /* a vector of future Holt-Winters seasonal coefs */
787     unsigned long elapsed_pdp_st;
788
789     double    interval, pre_int, post_int;  /* interval between this and
790                                              * the last run */
791     unsigned long proc_pdp_cnt;
792
793     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
794                  current_time, current_time_usec, version) == -1) {
795         return -1;
796     }
797
798     interval = (double) (*current_time - rrd->live_head->last_up)
799         + (double) ((long) *current_time_usec -
800                     (long) rrd->live_head->last_up_usec) / 1e6f;
801
802     /* process the data sources and update the pdp_prep 
803      * area accordingly */
804     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
805         return -1;
806     }
807
808     elapsed_pdp_st = calculate_elapsed_steps(rrd,
809                                              *current_time,
810                                              *current_time_usec, interval,
811                                              &pre_int, &post_int,
812                                              &proc_pdp_cnt);
813
814     /* has a pdp_st moment occurred since the last run ? */
815     if (elapsed_pdp_st == 0) {
816         /* no we have not passed a pdp_st moment. therefore update is simple */
817         simple_update(rrd, interval, pdp_new);
818     } else {
819         /* an pdp_st has occurred. */
820         if (process_all_pdp_st(rrd, interval,
821                                pre_int, post_int,
822                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
823             return -1;
824         }
825         if (update_all_cdp_prep(rrd, rra_step_cnt,
826                                 rra_begin, rrd_file,
827                                 elapsed_pdp_st,
828                                 proc_pdp_cnt,
829                                 &last_seasonal_coef,
830                                 &seasonal_coef,
831                                 pdp_temp,
832                                 skip_update, schedule_smooth) == -1) {
833             goto err_free_coefficients;
834         }
835         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
836                                  elapsed_pdp_st, pdp_temp,
837                                  &seasonal_coef) == -1) {
838             goto err_free_coefficients;
839         }
840         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
841                           *current_time, skip_update,
842                           pcdp_summary) == -1) {
843             goto err_free_coefficients;
844         }
845     }                   /* endif a pdp_st has occurred */
846     rrd->live_head->last_up = *current_time;
847     rrd->live_head->last_up_usec = *current_time_usec;
848
849     if (version < 3) {
850         *rrd->legacy_last_up = rrd->live_head->last_up;
851     }
852     free(seasonal_coef);
853     free(last_seasonal_coef);
854     return 0;
855
856   err_free_coefficients:
857     free(seasonal_coef);
858     free(last_seasonal_coef);
859     return -1;
860 }
861
862 /*
863  * Parse a DS string (time + colon-separated values), storing the
864  * results in current_time, current_time_usec, and updvals.
865  *
866  * Returns 0 on success, -1 on error.
867  */
868 static int parse_ds(
869     rrd_t *rrd,
870     char **updvals,
871     long *tmpl_idx,
872     char *input,
873     unsigned long tmpl_cnt,
874     time_t *current_time,
875     unsigned long *current_time_usec,
876     int version)
877 {
878     char     *p;
879     unsigned long i;
880     char      timesyntax;
881
882     updvals[0] = input;
883     /* initialize all ds input to unknown except the first one
884        which has always got to be set */
885     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
886         updvals[i] = "U";
887
888     /* separate all ds elements; first must be examined separately
889        due to alternate time syntax */
890     if ((p = strchr(input, '@')) != NULL) {
891         timesyntax = '@';
892     } else if ((p = strchr(input, ':')) != NULL) {
893         timesyntax = ':';
894     } else {
895         rrd_set_error("expected timestamp not found in data source from %s",
896                       input);
897         return -1;
898     }
899     *p = '\0';
900     i = 1;
901     updvals[tmpl_idx[i++]] = p + 1;
902     while (*(++p)) {
903         if (*p == ':') {
904             *p = '\0';
905             if (i < tmpl_cnt) {
906                 updvals[tmpl_idx[i++]] = p + 1;
907             }
908         }
909     }
910
911     if (i != tmpl_cnt) {
912         rrd_set_error("expected %lu data source readings (got %lu) from %s",
913                       tmpl_cnt - 1, i, input);
914         return -1;
915     }
916
917     if (get_time_from_reading(rrd, timesyntax, updvals,
918                               current_time, current_time_usec,
919                               version) == -1) {
920         return -1;
921     }
922     return 0;
923 }
924
925 /*
926  * Parse the time in a DS string, store it in current_time and 
927  * current_time_usec and verify that it's later than the last
928  * update for this DS.
929  *
930  * Returns 0 on success, -1 on error.
931  */
932 static int get_time_from_reading(
933     rrd_t *rrd,
934     char timesyntax,
935     char **updvals,
936     time_t *current_time,
937     unsigned long *current_time_usec,
938     int version)
939 {
940     double    tmp;
941     char     *parsetime_error = NULL;
942     char     *old_locale;
943     rrd_time_value_t ds_tv;
944     struct timeval tmp_time;    /* used for time conversion */
945
946     /* get the time from the reading ... handle N */
947     if (timesyntax == '@') {    /* at-style */
948         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
949             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
950             return -1;
951         }
952         if (ds_tv.type == RELATIVE_TO_END_TIME ||
953             ds_tv.type == RELATIVE_TO_START_TIME) {
954             rrd_set_error("specifying time relative to the 'start' "
955                           "or 'end' makes no sense here: %s", updvals[0]);
956             return -1;
957         }
958         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
959         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
960     } else if (strcmp(updvals[0], "N") == 0) {
961         gettimeofday(&tmp_time, 0);
962         normalize_time(&tmp_time);
963         *current_time = tmp_time.tv_sec;
964         *current_time_usec = tmp_time.tv_usec;
965     } else {
966         old_locale = setlocale(LC_NUMERIC, "C");
967         errno = 0;
968         tmp = strtod(updvals[0], 0);
969         if (errno > 0) {
970             rrd_set_error("converting '%s' to float: %s",
971                 updvals[0], rrd_strerror(errno));
972             return -1;
973         };
974         setlocale(LC_NUMERIC, old_locale);
975         if (tmp < 0.0){
976             gettimeofday(&tmp_time, 0);
977             tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
978         }
979
980         *current_time = floor(tmp);
981         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
982     }
983     /* dont do any correction for old version RRDs */
984     if (version < 3)
985         *current_time_usec = 0;
986
987     if (*current_time < rrd->live_head->last_up ||
988         (*current_time == rrd->live_head->last_up &&
989          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
990         rrd_set_error("illegal attempt to update using time %ld when "
991                       "last update time is %ld (minimum one second step)",
992                       *current_time, rrd->live_head->last_up);
993         return -1;
994     }
995     return 0;
996 }
997
998 /*
999  * Update pdp_new by interpreting the updvals according to the DS type
1000  * (COUNTER, GAUGE, etc.).
1001  *
1002  * Returns 0 on success, -1 on error.
1003  */
1004 static int update_pdp_prep(
1005     rrd_t *rrd,
1006     char **updvals,
1007     rrd_value_t *pdp_new,
1008     double interval)
1009 {
1010     unsigned long ds_idx;
1011     int       ii;
1012     char     *endptr;   /* used in the conversion */
1013     double    rate;
1014     char     *old_locale;
1015     enum dst_en dst_idx;
1016
1017     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1018         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1019
1020         /* make sure we do not build diffs with old last_ds values */
1021         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1022             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1023             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1024         }
1025
1026         /* NOTE: DST_CDEF should never enter this if block, because
1027          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1028          * accidently specified a value for the DST_CDEF. To handle this case,
1029          * an extra check is required. */
1030
1031         if ((updvals[ds_idx + 1][0] != 'U') &&
1032             (dst_idx != DST_CDEF) &&
1033             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1034             rate = DNAN;
1035
1036             /* pdp_new contains rate * time ... eg the bytes transferred during
1037              * the interval. Doing it this way saves a lot of math operations
1038              */
1039             switch (dst_idx) {
1040             case DST_COUNTER:
1041             case DST_DERIVE:
1042                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1043                     if ((updvals[ds_idx + 1][ii] < '0'
1044                          || updvals[ds_idx + 1][ii] > '9')
1045                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1046                         rrd_set_error("not a simple integer: '%s'",
1047                                       updvals[ds_idx + 1]);
1048                         return -1;
1049                     }
1050                 }
1051                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1052                     pdp_new[ds_idx] =
1053                         rrd_diff(updvals[ds_idx + 1],
1054                                  rrd->pdp_prep[ds_idx].last_ds);
1055                     if (dst_idx == DST_COUNTER) {
1056                         /* simple overflow catcher. This will fail
1057                          * terribly for non 32 or 64 bit counters
1058                          * ... are there any others in SNMP land?
1059                          */
1060                         if (pdp_new[ds_idx] < (double) 0.0)
1061                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1062                         if (pdp_new[ds_idx] < (double) 0.0)
1063                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1064                     }
1065                     rate = pdp_new[ds_idx] / interval;
1066                 } else {
1067                     pdp_new[ds_idx] = DNAN;
1068                 }
1069                 break;
1070             case DST_ABSOLUTE:
1071                 old_locale = setlocale(LC_NUMERIC, "C");
1072                 errno = 0;
1073                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1074                 if (errno > 0) {
1075                     rrd_set_error("converting '%s' to float: %s",
1076                                   updvals[ds_idx + 1], rrd_strerror(errno));
1077                     return -1;
1078                 };
1079                 setlocale(LC_NUMERIC, old_locale);
1080                 if (endptr[0] != '\0') {
1081                     rrd_set_error
1082                         ("conversion of '%s' to float not complete: tail '%s'",
1083                          updvals[ds_idx + 1], endptr);
1084                     return -1;
1085                 }
1086                 rate = pdp_new[ds_idx] / interval;
1087                 break;
1088             case DST_GAUGE:
1089                 old_locale = setlocale(LC_NUMERIC, "C");
1090                 errno = 0;
1091                 pdp_new[ds_idx] =
1092                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1093                 if (errno) {
1094                     rrd_set_error("converting '%s' to float: %s",
1095                                   updvals[ds_idx + 1], rrd_strerror(errno));
1096                     return -1;
1097                 };
1098                 setlocale(LC_NUMERIC, old_locale);
1099                 if (endptr[0] != '\0') {
1100                     rrd_set_error
1101                         ("conversion of '%s' to float not complete: tail '%s'",
1102                          updvals[ds_idx + 1], endptr);
1103                     return -1;
1104                 }
1105                 rate = pdp_new[ds_idx] / interval;
1106                 break;
1107             default:
1108                 rrd_set_error("rrd contains unknown DS type : '%s'",
1109                               rrd->ds_def[ds_idx].dst);
1110                 return -1;
1111             }
1112             /* break out of this for loop if the error string is set */
1113             if (rrd_test_error()) {
1114                 return -1;
1115             }
1116             /* make sure pdp_temp is neither too large or too small
1117              * if any of these occur it becomes unknown ...
1118              * sorry folks ... */
1119             if (!isnan(rate) &&
1120                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1121                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1122                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1123                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1124                 pdp_new[ds_idx] = DNAN;
1125             }
1126         } else {
1127             /* no news is news all the same */
1128             pdp_new[ds_idx] = DNAN;
1129         }
1130
1131
1132         /* make a copy of the command line argument for the next run */
1133 #ifdef DEBUG
1134         fprintf(stderr, "prep ds[%lu]\t"
1135                 "last_arg '%s'\t"
1136                 "this_arg '%s'\t"
1137                 "pdp_new %10.2f\n",
1138                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1139                 pdp_new[ds_idx]);
1140 #endif
1141         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1142                 LAST_DS_LEN - 1);
1143         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1144     }
1145     return 0;
1146 }
1147
1148 /*
1149  * How many PDP steps have elapsed since the last update? Returns the answer,
1150  * and stores the time between the last update and the last PDP in pre_time,
1151  * and the time between the last PDP and the current time in post_int.
1152  */
1153 static int calculate_elapsed_steps(
1154     rrd_t *rrd,
1155     unsigned long current_time,
1156     unsigned long current_time_usec,
1157     double interval,
1158     double *pre_int,
1159     double *post_int,
1160     unsigned long *proc_pdp_cnt)
1161 {
1162     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1163     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1164                                  * time */
1165     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1166                                  * when it was last updated */
1167     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1168
1169     /* when was the current pdp started */
1170     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1171     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1172
1173     /* when did the last pdp_st occur */
1174     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1175     occu_pdp_st = current_time - occu_pdp_age;
1176
1177     if (occu_pdp_st > proc_pdp_st) {
1178         /* OK we passed the pdp_st moment */
1179         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1180                                                                      * occurred before the latest
1181                                                                      * pdp_st moment*/
1182         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1183         *post_int = occu_pdp_age;   /* how much after it */
1184         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1185     } else {
1186         *pre_int = interval;
1187         *post_int = 0;
1188     }
1189
1190     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1191
1192 #ifdef DEBUG
1193     printf("proc_pdp_age %lu\t"
1194            "proc_pdp_st %lu\t"
1195            "occu_pfp_age %lu\t"
1196            "occu_pdp_st %lu\t"
1197            "int %lf\t"
1198            "pre_int %lf\t"
1199            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1200            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1201 #endif
1202
1203     /* compute the number of elapsed pdp_st moments */
1204     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1205 }
1206
1207 /*
1208  * Increment the PDP values by the values in pdp_new, or else initialize them.
1209  */
1210 static void simple_update(
1211     rrd_t *rrd,
1212     double interval,
1213     rrd_value_t *pdp_new)
1214 {
1215     int       i;
1216
1217     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1218         if (isnan(pdp_new[i])) {
1219             /* this is not really accurate if we use subsecond data arrival time
1220                should have thought of it when going subsecond resolution ...
1221                sorry next format change we will have it! */
1222             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1223                 floor(interval);
1224         } else {
1225             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1226                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1227             } else {
1228                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1229             }
1230         }
1231 #ifdef DEBUG
1232         fprintf(stderr,
1233                 "NO PDP  ds[%i]\t"
1234                 "value %10.2f\t"
1235                 "unkn_sec %5lu\n",
1236                 i,
1237                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1238                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1239 #endif
1240     }
1241 }
1242
1243 /*
1244  * Call process_pdp_st for each DS.
1245  *
1246  * Returns 0 on success, -1 on error.
1247  */
1248 static int process_all_pdp_st(
1249     rrd_t *rrd,
1250     double interval,
1251     double pre_int,
1252     double post_int,
1253     unsigned long elapsed_pdp_st,
1254     rrd_value_t *pdp_new,
1255     rrd_value_t *pdp_temp)
1256 {
1257     unsigned long ds_idx;
1258
1259     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1260        rate*seconds which occurred up to the last run.
1261        pdp_new[] contains rate*seconds from the latest run.
1262        pdp_temp[] will contain the rate for cdp */
1263
1264     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1265         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1266                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1267                            pdp_new, pdp_temp) == -1) {
1268             return -1;
1269         }
1270 #ifdef DEBUG
1271         fprintf(stderr, "PDP UPD ds[%lu]\t"
1272                 "elapsed_pdp_st %lu\t"
1273                 "pdp_temp %10.2f\t"
1274                 "new_prep %10.2f\t"
1275                 "new_unkn_sec %5lu\n",
1276                 ds_idx,
1277                 elapsed_pdp_st,
1278                 pdp_temp[ds_idx],
1279                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1280                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1281 #endif
1282     }
1283     return 0;
1284 }
1285
1286 /*
1287  * Process an update that occurs after one of the PDP moments.
1288  * Increments the PDP value, sets NAN if time greater than the
1289  * heartbeats have elapsed, processes CDEFs.
1290  *
1291  * Returns 0 on success, -1 on error.
1292  */
1293 static int process_pdp_st(
1294     rrd_t *rrd,
1295     unsigned long ds_idx,
1296     double interval,
1297     double pre_int,
1298     double post_int,
1299     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1300     rrd_value_t *pdp_new,
1301     rrd_value_t *pdp_temp)
1302 {
1303     int       i;
1304
1305     /* update pdp_prep to the current pdp_st. */
1306     double    pre_unknown = 0.0;
1307     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1308     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1309
1310     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1311
1312     rpnstack_init(&rpnstack);
1313
1314
1315     if (isnan(pdp_new[ds_idx])) {
1316         /* a final bit of unknown to be added before calculation
1317            we use a temporary variable for this so that we
1318            don't have to turn integer lines before using the value */
1319         pre_unknown = pre_int;
1320     } else {
1321         if (isnan(scratch[PDP_val].u_val)) {
1322             scratch[PDP_val].u_val = 0;
1323         }
1324         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1325     }
1326
1327     /* if too much of the pdp_prep is unknown we dump it */
1328     /* if the interval is larger thatn mrhb we get NAN */
1329     if ((interval > mrhb) ||
1330         (rrd->stat_head->pdp_step / 2.0 <
1331          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1332         pdp_temp[ds_idx] = DNAN;
1333     } else {
1334         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1335             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1336              pre_unknown);
1337     }
1338
1339     /* process CDEF data sources; remember each CDEF DS can
1340      * only reference other DS with a lower index number */
1341     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1342         rpnp_t   *rpnp;
1343
1344         rpnp =
1345             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1346         /* substitute data values for OP_VARIABLE nodes */
1347         for (i = 0; rpnp[i].op != OP_END; i++) {
1348             if (rpnp[i].op == OP_VARIABLE) {
1349                 rpnp[i].op = OP_NUMBER;
1350                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1351             }
1352         }
1353         /* run the rpn calculator */
1354         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1355             free(rpnp);
1356             rpnstack_free(&rpnstack);
1357             return -1;
1358         }
1359     }
1360
1361     /* make pdp_prep ready for the next run */
1362     if (isnan(pdp_new[ds_idx])) {
1363         /* this is not realy accurate if we use subsecond data arival time
1364            should have thought of it when going subsecond resolution ...
1365            sorry next format change we will have it! */
1366         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1367         scratch[PDP_val].u_val = DNAN;
1368     } else {
1369         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1370         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1371     }
1372     rpnstack_free(&rpnstack);
1373     return 0;
1374 }
1375
1376 /*
1377  * Iterate over all the RRAs for a given DS and:
1378  * 1. Decide whether to schedule a smooth later
1379  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1380  * 3. Update the CDP
1381  *
1382  * Returns 0 on success, -1 on error
1383  */
1384 static int update_all_cdp_prep(
1385     rrd_t *rrd,
1386     unsigned long *rra_step_cnt,
1387     unsigned long rra_begin,
1388     rrd_file_t *rrd_file,
1389     unsigned long elapsed_pdp_st,
1390     unsigned long proc_pdp_cnt,
1391     rrd_value_t **last_seasonal_coef,
1392     rrd_value_t **seasonal_coef,
1393     rrd_value_t *pdp_temp,
1394     unsigned long *skip_update,
1395     int *schedule_smooth)
1396 {
1397     unsigned long rra_idx;
1398
1399     /* index into the CDP scratch array */
1400     enum cf_en current_cf;
1401     unsigned long rra_start;
1402
1403     /* number of rows to be updated in an RRA for a data value. */
1404     unsigned long start_pdp_offset;
1405
1406     rra_start = rra_begin;
1407     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1408         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1409         start_pdp_offset =
1410             rrd->rra_def[rra_idx].pdp_cnt -
1411             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1412         skip_update[rra_idx] = 0;
1413         if (start_pdp_offset <= elapsed_pdp_st) {
1414             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1415                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1416         } else {
1417             rra_step_cnt[rra_idx] = 0;
1418         }
1419
1420         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1421             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1422              * so that they will be correct for the next observed value; note that for
1423              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1424              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1425             if (rra_step_cnt[rra_idx] > 1) {
1426                 skip_update[rra_idx] = 1;
1427                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1428                                 elapsed_pdp_st, last_seasonal_coef);
1429                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1430                                 elapsed_pdp_st + 1, seasonal_coef);
1431             }
1432             /* periodically run a smoother for seasonal effects */
1433             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1434 #ifdef DEBUG
1435                 fprintf(stderr,
1436                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1437                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1438                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1439                         u_cnt);
1440 #endif
1441                 *schedule_smooth = 1;
1442             }
1443         }
1444         if (rrd_test_error())
1445             return -1;
1446
1447         if (update_cdp_prep
1448             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1449              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1450              current_cf) == -1) {
1451             return -1;
1452         }
1453         rra_start +=
1454             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1455             sizeof(rrd_value_t);
1456     }
1457     return 0;
1458 }
1459
1460 /* 
1461  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1462  */
1463 static int do_schedule_smooth(
1464     rrd_t *rrd,
1465     unsigned long rra_idx,
1466     unsigned long elapsed_pdp_st)
1467 {
1468     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1469     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1470     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1471     unsigned long seasonal_smooth_idx =
1472         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1473     unsigned long *init_seasonal =
1474         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1475
1476     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1477      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1478      * really an RRA level, not a data source within RRA level parameter, but
1479      * the rra_def is read only for rrd_update (not flushed to disk). */
1480     if (*init_seasonal > BURNIN_CYCLES) {
1481         /* someone has no doubt invented a trick to deal with this wrap around,
1482          * but at least this code is clear. */
1483         if (seasonal_smooth_idx > cur_row) {
1484             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1485              * between PDP and CDP */
1486             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1487         }
1488         /* can't rely on negative numbers because we are working with
1489          * unsigned values */
1490         return (cur_row + elapsed_pdp_st >= row_cnt
1491                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1492     }
1493     /* mark off one of the burn-in cycles */
1494     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1495 }
1496
1497 /*
1498  * For a given RRA, iterate over the data sources and call the appropriate
1499  * consolidation function.
1500  *
1501  * Returns 0 on success, -1 on error.
1502  */
1503 static int update_cdp_prep(
1504     rrd_t *rrd,
1505     unsigned long elapsed_pdp_st,
1506     unsigned long start_pdp_offset,
1507     unsigned long *rra_step_cnt,
1508     int rra_idx,
1509     rrd_value_t *pdp_temp,
1510     rrd_value_t *last_seasonal_coef,
1511     rrd_value_t *seasonal_coef,
1512     int current_cf)
1513 {
1514     unsigned long ds_idx, cdp_idx;
1515
1516     /* update CDP_PREP areas */
1517     /* loop over data soures within each RRA */
1518     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1519
1520         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1521
1522         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1523             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1524                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1525                        elapsed_pdp_st, start_pdp_offset,
1526                        rrd->rra_def[rra_idx].pdp_cnt,
1527                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1528                        rra_idx, ds_idx);
1529         } else {
1530             /* Nothing to consolidate if there's one PDP per CDP. However, if
1531              * we've missed some PDPs, let's update null counters etc. */
1532             if (elapsed_pdp_st > 2) {
1533                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1534                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1535                           current_cf);
1536             }
1537         }
1538
1539         if (rrd_test_error())
1540             return -1;
1541     }                   /* endif data sources loop */
1542     return 0;
1543 }
1544
1545 /*
1546  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1547  * primary value, secondary value, and # of unknowns.
1548  */
1549 static void update_cdp(
1550     unival *scratch,
1551     int current_cf,
1552     rrd_value_t pdp_temp_val,
1553     unsigned long rra_step_cnt,
1554     unsigned long elapsed_pdp_st,
1555     unsigned long start_pdp_offset,
1556     unsigned long pdp_cnt,
1557     rrd_value_t xff,
1558     int i,
1559     int ii)
1560 {
1561     /* shorthand variables */
1562     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1563     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1564     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1565     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1566
1567     if (rra_step_cnt) {
1568         /* If we are in this block, as least 1 CDP value will be written to
1569          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1570          * to be written, then the "fill in" value is the CDP_secondary_val
1571          * entry. */
1572         if (isnan(pdp_temp_val)) {
1573             *cdp_unkn_pdp_cnt += start_pdp_offset;
1574             *cdp_secondary_val = DNAN;
1575         } else {
1576             /* CDP_secondary value is the RRA "fill in" value for intermediary
1577              * CDP data entries. No matter the CF, the value is the same because
1578              * the average, max, min, and last of a list of identical values is
1579              * the same, namely, the value itself. */
1580             *cdp_secondary_val = pdp_temp_val;
1581         }
1582
1583         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1584             *cdp_primary_val = DNAN;
1585             if (current_cf == CF_AVERAGE) {
1586                 *cdp_val =
1587                     initialize_average_carry_over(pdp_temp_val,
1588                                                   elapsed_pdp_st,
1589                                                   start_pdp_offset, pdp_cnt);
1590             } else {
1591                 *cdp_val = pdp_temp_val;
1592             }
1593         } else {
1594             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1595                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1596         }               /* endif meets xff value requirement for a valid value */
1597         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1598          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1599         if (isnan(pdp_temp_val))
1600             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1601         else
1602             *cdp_unkn_pdp_cnt = 0;
1603     } else {            /* rra_step_cnt[i]  == 0 */
1604
1605 #ifdef DEBUG
1606         if (isnan(*cdp_val)) {
1607             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1608                     i, ii);
1609         } else {
1610             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1611                     i, ii, *cdp_val);
1612         }
1613 #endif
1614         if (isnan(pdp_temp_val)) {
1615             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1616         } else {
1617             *cdp_val =
1618                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1619                                   current_cf, i, ii);
1620         }
1621     }
1622 }
1623
1624 /*
1625  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1626  * on the type of consolidation function.
1627  */
1628 static void initialize_cdp_val(
1629     unival *scratch,
1630     int current_cf,
1631     rrd_value_t pdp_temp_val,
1632     unsigned long elapsed_pdp_st,
1633     unsigned long start_pdp_offset,
1634     unsigned long pdp_cnt)
1635 {
1636     rrd_value_t cum_val, cur_val;
1637
1638     switch (current_cf) {
1639     case CF_AVERAGE:
1640         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1641         cur_val = IFDNAN(pdp_temp_val, 0.0);
1642         scratch[CDP_primary_val].u_val =
1643             (cum_val + cur_val * start_pdp_offset) /
1644             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1645         scratch[CDP_val].u_val =
1646             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1647                                           start_pdp_offset, pdp_cnt);
1648         break;
1649     case CF_MAXIMUM:
1650         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1651         cur_val = IFDNAN(pdp_temp_val, -DINF);
1652 #if 0
1653 #ifdef DEBUG
1654         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1655             fprintf(stderr,
1656                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1657                     i, ii);
1658             exit(-1);
1659         }
1660 #endif
1661 #endif
1662         if (cur_val > cum_val)
1663             scratch[CDP_primary_val].u_val = cur_val;
1664         else
1665             scratch[CDP_primary_val].u_val = cum_val;
1666         /* initialize carry over value */
1667         scratch[CDP_val].u_val = pdp_temp_val;
1668         break;
1669     case CF_MINIMUM:
1670         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1671         cur_val = IFDNAN(pdp_temp_val, DINF);
1672 #if 0
1673 #ifdef DEBUG
1674         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1675             fprintf(stderr,
1676                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1677                     ii);
1678             exit(-1);
1679         }
1680 #endif
1681 #endif
1682         if (cur_val < cum_val)
1683             scratch[CDP_primary_val].u_val = cur_val;
1684         else
1685             scratch[CDP_primary_val].u_val = cum_val;
1686         /* initialize carry over value */
1687         scratch[CDP_val].u_val = pdp_temp_val;
1688         break;
1689     case CF_LAST:
1690     default:
1691         scratch[CDP_primary_val].u_val = pdp_temp_val;
1692         /* initialize carry over value */
1693         scratch[CDP_val].u_val = pdp_temp_val;
1694         break;
1695     }
1696 }
1697
1698 /*
1699  * Update the consolidation function for Holt-Winters functions as
1700  * well as other functions that don't actually consolidate multiple
1701  * PDPs.
1702  */
1703 static void reset_cdp(
1704     rrd_t *rrd,
1705     unsigned long elapsed_pdp_st,
1706     rrd_value_t *pdp_temp,
1707     rrd_value_t *last_seasonal_coef,
1708     rrd_value_t *seasonal_coef,
1709     int rra_idx,
1710     int ds_idx,
1711     int cdp_idx,
1712     enum cf_en current_cf)
1713 {
1714     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1715
1716     switch (current_cf) {
1717     case CF_AVERAGE:
1718     default:
1719         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1720         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1721         break;
1722     case CF_SEASONAL:
1723     case CF_DEVSEASONAL:
1724         /* need to update cached seasonal values, so they are consistent
1725          * with the bulk update */
1726         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1727          * CDP_last_deviation are the same. */
1728         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1729         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1730         break;
1731     case CF_HWPREDICT:
1732     case CF_MHWPREDICT:
1733         /* need to update the null_count and last_null_count.
1734          * even do this for non-DNAN pdp_temp because the
1735          * algorithm is not learning from batch updates. */
1736         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1737         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1738         /* fall through */
1739     case CF_DEVPREDICT:
1740         scratch[CDP_primary_val].u_val = DNAN;
1741         scratch[CDP_secondary_val].u_val = DNAN;
1742         break;
1743     case CF_FAILURES:
1744         /* do not count missed bulk values as failures */
1745         scratch[CDP_primary_val].u_val = 0;
1746         scratch[CDP_secondary_val].u_val = 0;
1747         /* need to reset violations buffer.
1748          * could do this more carefully, but for now, just
1749          * assume a bulk update wipes away all violations. */
1750         erase_violations(rrd, cdp_idx, rra_idx);
1751         break;
1752     }
1753 }
1754
1755 static rrd_value_t initialize_average_carry_over(
1756     rrd_value_t pdp_temp_val,
1757     unsigned long elapsed_pdp_st,
1758     unsigned long start_pdp_offset,
1759     unsigned long pdp_cnt)
1760 {
1761     /* initialize carry over value */
1762     if (isnan(pdp_temp_val)) {
1763         return DNAN;
1764     }
1765     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1766 }
1767
1768 /*
1769  * Update or initialize a CDP value based on the consolidation
1770  * function.
1771  *
1772  * Returns the new value.
1773  */
1774 static rrd_value_t calculate_cdp_val(
1775     rrd_value_t cdp_val,
1776     rrd_value_t pdp_temp_val,
1777     unsigned long elapsed_pdp_st,
1778     int current_cf,
1779 #ifdef DEBUG
1780     int i,
1781     int ii
1782 #else
1783     int UNUSED(i),
1784     int UNUSED(ii)
1785 #endif
1786     )
1787 {
1788     if (isnan(cdp_val)) {
1789         if (current_cf == CF_AVERAGE) {
1790             pdp_temp_val *= elapsed_pdp_st;
1791         }
1792 #ifdef DEBUG
1793         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1794                 i, ii, pdp_temp_val);
1795 #endif
1796         return pdp_temp_val;
1797     }
1798     if (current_cf == CF_AVERAGE)
1799         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1800     if (current_cf == CF_MINIMUM)
1801         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1802     if (current_cf == CF_MAXIMUM)
1803         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1804
1805     return pdp_temp_val;
1806 }
1807
1808 /*
1809  * For each RRA, update the seasonal values and then call update_aberrant_CF
1810  * for each data source.
1811  *
1812  * Return 0 on success, -1 on error.
1813  */
1814 static int update_aberrant_cdps(
1815     rrd_t *rrd,
1816     rrd_file_t *rrd_file,
1817     unsigned long rra_begin,
1818     unsigned long elapsed_pdp_st,
1819     rrd_value_t *pdp_temp,
1820     rrd_value_t **seasonal_coef)
1821 {
1822     unsigned long rra_idx, ds_idx, j;
1823
1824     /* number of PDP steps since the last update that
1825      * are assigned to the first CDP to be generated
1826      * since the last update. */
1827     unsigned short scratch_idx;
1828     unsigned long rra_start;
1829     enum cf_en current_cf;
1830
1831     /* this loop is only entered if elapsed_pdp_st < 3 */
1832     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1833          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1834         rra_start = rra_begin;
1835         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1836             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1837                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1838                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1839                     if (scratch_idx == CDP_primary_val) {
1840                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1841                                         elapsed_pdp_st + 1, seasonal_coef);
1842                     } else {
1843                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1844                                         elapsed_pdp_st + 2, seasonal_coef);
1845                     }
1846                 }
1847                 if (rrd_test_error())
1848                     return -1;
1849                 /* loop over data soures within each RRA */
1850                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1851                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1852                                        rra_idx * (rrd->stat_head->ds_cnt) +
1853                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1854                                        *seasonal_coef);
1855                 }
1856             }
1857             rra_start += rrd->rra_def[rra_idx].row_cnt
1858                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1859         }
1860     }
1861     return 0;
1862 }
1863
1864 /* 
1865  * Move sequentially through the file, writing one RRA at a time.  Note this
1866  * architecture divorces the computation of CDP with flushing updated RRA
1867  * entries to disk.
1868  *
1869  * Return 0 on success, -1 on error.
1870  */
1871 static int write_to_rras(
1872     rrd_t *rrd,
1873     rrd_file_t *rrd_file,
1874     unsigned long *rra_step_cnt,
1875     unsigned long rra_begin,
1876     time_t current_time,
1877     unsigned long *skip_update,
1878     rrd_info_t ** pcdp_summary)
1879 {
1880     unsigned long rra_idx;
1881     unsigned long rra_start;
1882     time_t    rra_time = 0; /* time of update for a RRA */
1883
1884     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1885     
1886     /* Ready to write to disk */
1887     rra_start = rra_begin;
1888
1889     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1890         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1891         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1892
1893         /* for cdp_prep */
1894         unsigned short scratch_idx;
1895         unsigned long step_subtract;
1896
1897         for (scratch_idx = CDP_primary_val,
1898                  step_subtract = 1;
1899              rra_step_cnt[rra_idx] > 0;
1900              rra_step_cnt[rra_idx]--,
1901                  scratch_idx = CDP_secondary_val,
1902                  step_subtract = 2) {
1903
1904             size_t rra_pos_new;
1905 #ifdef DEBUG
1906             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1907 #endif
1908             /* increment, with wrap-around */
1909             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1910               rra_ptr->cur_row = 0;
1911
1912             /* we know what our position should be */
1913             rra_pos_new = rra_start
1914               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1915
1916             /* re-seek if the position is wrong or we wrapped around */
1917             if ((size_t)rra_pos_new != rrd_file->pos) {
1918                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1919                     rrd_set_error("seek error in rrd");
1920                     return -1;
1921                 }
1922             }
1923 #ifdef DEBUG
1924             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1925 #endif
1926
1927             if (skip_update[rra_idx])
1928                 continue;
1929
1930             if (*pcdp_summary != NULL) {
1931                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1932
1933                 rra_time = (current_time - current_time % step_time)
1934                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1935             }
1936
1937             if (write_RRA_row
1938                 (rrd_file, rrd, rra_idx, scratch_idx,
1939                  pcdp_summary, rra_time) == -1)
1940                 return -1;
1941
1942             rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1943         }
1944
1945         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1946     } /* RRA LOOP */
1947
1948     return 0;
1949 }
1950
1951 /*
1952  * Write out one row of values (one value per DS) to the archive.
1953  *
1954  * Returns 0 on success, -1 on error.
1955  */
1956 static int write_RRA_row(
1957     rrd_file_t *rrd_file,
1958     rrd_t *rrd,
1959     unsigned long rra_idx,
1960     unsigned short CDP_scratch_idx,
1961     rrd_info_t ** pcdp_summary,
1962     time_t rra_time)
1963 {
1964     unsigned long ds_idx, cdp_idx;
1965     rrd_infoval_t iv;
1966
1967     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1968         /* compute the cdp index */
1969         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1970 #ifdef DEBUG
1971         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1972                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1973                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1974 #endif
1975         if (*pcdp_summary != NULL) {
1976             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1977             /* append info to the return hash */
1978             *pcdp_summary = rrd_info_push(*pcdp_summary,
1979                                           sprintf_alloc
1980                                           ("[%lli]RRA[%s][%lu]DS[%s]", 
1981                                            (long long)rra_time,
1982                                            rrd->rra_def[rra_idx].cf_nam,
1983                                            rrd->rra_def[rra_idx].pdp_cnt,
1984                                            rrd->ds_def[ds_idx].ds_nam),
1985                                            RD_I_VAL, iv);
1986         }
1987         errno = 0;
1988         if (rrd_write(rrd_file,
1989                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1990                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1991             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1992             return -1;
1993         }
1994     }
1995     return 0;
1996 }
1997
1998 /*
1999  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2000  *
2001  * Returns 0 on success, -1 otherwise
2002  */
2003 static int smooth_all_rras(
2004     rrd_t *rrd,
2005     rrd_file_t *rrd_file,
2006     unsigned long rra_begin)
2007 {
2008     unsigned long rra_start = rra_begin;
2009     unsigned long rra_idx;
2010
2011     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2012         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2013             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2014 #ifdef DEBUG
2015             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2016 #endif
2017             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2018             if (rrd_test_error())
2019                 return -1;
2020         }
2021         rra_start += rrd->rra_def[rra_idx].row_cnt
2022             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2023     }
2024     return 0;
2025 }
2026
2027 #ifndef HAVE_MMAP
2028 /*
2029  * Flush changes to disk (unless we're using mmap)
2030  *
2031  * Returns 0 on success, -1 otherwise
2032  */
2033 static int write_changes_to_disk(
2034     rrd_t *rrd,
2035     rrd_file_t *rrd_file,
2036     int version)
2037 {
2038     /* we just need to write back the live header portion now */
2039     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2040                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2041                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2042                  SEEK_SET) != 0) {
2043         rrd_set_error("seek rrd for live header writeback");
2044         return -1;
2045     }
2046     if (version >= 3) {
2047         if (rrd_write(rrd_file, rrd->live_head,
2048                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2049             rrd_set_error("rrd_write live_head to rrd");
2050             return -1;
2051         }
2052     } else {
2053         if (rrd_write(rrd_file, rrd->legacy_last_up,
2054                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2055             rrd_set_error("rrd_write live_head to rrd");
2056             return -1;
2057         }
2058     }
2059
2060
2061     if (rrd_write(rrd_file, rrd->pdp_prep,
2062                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2063         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2064         rrd_set_error("rrd_write pdp_prep to rrd");
2065         return -1;
2066     }
2067
2068     if (rrd_write(rrd_file, rrd->cdp_prep,
2069                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2070                   rrd->stat_head->ds_cnt)
2071         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2072                       rrd->stat_head->ds_cnt)) {
2073
2074         rrd_set_error("rrd_write cdp_prep to rrd");
2075         return -1;
2076     }
2077
2078     if (rrd_write(rrd_file, rrd->rra_ptr,
2079                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2080         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2081         rrd_set_error("rrd_write rra_ptr to rrd");
2082         return -1;
2083     }
2084     return 0;
2085 }
2086 #endif