did not pick up all the changes for rrdcached in the first round ... so here is the...
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
4  *                Copyright by Florian Forster, 2008
5  *****************************************************************************
6  * rrd_update.c  RRD Update Function
7  *****************************************************************************
8  * $Id$
9  *****************************************************************************/
10
11 #include "rrd_tool.h"
12
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
18
19 #include <locale.h>
20
21 #include "rrd_hw.h"
22 #include "rrd_rpncalc.h"
23
24 #include "rrd_is_thread_safe.h"
25 #include "unused.h"
26
27 #include "rrd_client.h"
28
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 /*
31  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
32  * replacement.
33  */
34 #include <sys/timeb.h>
35
36 #ifndef __MINGW32__
37 struct timeval {
38     time_t    tv_sec;   /* seconds */
39     long      tv_usec;  /* microseconds */
40 };
41 #endif
42
43 struct __timezone {
44     int       tz_minuteswest;   /* minutes W of Greenwich */
45     int       tz_dsttime;   /* type of dst correction */
46 };
47
48 static int gettimeofday(
49     struct timeval *t,
50     struct __timezone *tz)
51 {
52
53     struct _timeb current_time;
54
55     _ftime(&current_time);
56
57     t->tv_sec = current_time.time;
58     t->tv_usec = current_time.millitm * 1000;
59
60     return 0;
61 }
62
63 #endif
64
65 /* FUNCTION PROTOTYPES */
66
67 int       rrd_update_r(
68     const char *filename,
69     const char *tmplt,
70     int argc,
71     const char **argv);
72 int       _rrd_update(
73     const char *filename,
74     const char *tmplt,
75     int argc,
76     const char **argv,
77     rrd_info_t *);
78
79 static int allocate_data_structures(
80     rrd_t *rrd,
81     char ***updvals,
82     rrd_value_t **pdp_temp,
83     const char *tmplt,
84     long **tmpl_idx,
85     unsigned long *tmpl_cnt,
86     unsigned long **rra_step_cnt,
87     unsigned long **skip_update,
88     rrd_value_t **pdp_new);
89
90 static int parse_template(
91     rrd_t *rrd,
92     const char *tmplt,
93     unsigned long *tmpl_cnt,
94     long *tmpl_idx);
95
96 static int process_arg(
97     char *step_start,
98     rrd_t *rrd,
99     rrd_file_t *rrd_file,
100     unsigned long rra_begin,
101     time_t *current_time,
102     unsigned long *current_time_usec,
103     rrd_value_t *pdp_temp,
104     rrd_value_t *pdp_new,
105     unsigned long *rra_step_cnt,
106     char **updvals,
107     long *tmpl_idx,
108     unsigned long tmpl_cnt,
109     rrd_info_t ** pcdp_summary,
110     int version,
111     unsigned long *skip_update,
112     int *schedule_smooth);
113
114 static int parse_ds(
115     rrd_t *rrd,
116     char **updvals,
117     long *tmpl_idx,
118     char *input,
119     unsigned long tmpl_cnt,
120     time_t *current_time,
121     unsigned long *current_time_usec,
122     int version);
123
124 static int get_time_from_reading(
125     rrd_t *rrd,
126     char timesyntax,
127     char **updvals,
128     time_t *current_time,
129     unsigned long *current_time_usec,
130     int version);
131
132 static int update_pdp_prep(
133     rrd_t *rrd,
134     char **updvals,
135     rrd_value_t *pdp_new,
136     double interval);
137
138 static int calculate_elapsed_steps(
139     rrd_t *rrd,
140     unsigned long current_time,
141     unsigned long current_time_usec,
142     double interval,
143     double *pre_int,
144     double *post_int,
145     unsigned long *proc_pdp_cnt);
146
147 static void simple_update(
148     rrd_t *rrd,
149     double interval,
150     rrd_value_t *pdp_new);
151
152 static int process_all_pdp_st(
153     rrd_t *rrd,
154     double interval,
155     double pre_int,
156     double post_int,
157     unsigned long elapsed_pdp_st,
158     rrd_value_t *pdp_new,
159     rrd_value_t *pdp_temp);
160
161 static int process_pdp_st(
162     rrd_t *rrd,
163     unsigned long ds_idx,
164     double interval,
165     double pre_int,
166     double post_int,
167     long diff_pdp_st,
168     rrd_value_t *pdp_new,
169     rrd_value_t *pdp_temp);
170
171 static int update_all_cdp_prep(
172     rrd_t *rrd,
173     unsigned long *rra_step_cnt,
174     unsigned long rra_begin,
175     rrd_file_t *rrd_file,
176     unsigned long elapsed_pdp_st,
177     unsigned long proc_pdp_cnt,
178     rrd_value_t **last_seasonal_coef,
179     rrd_value_t **seasonal_coef,
180     rrd_value_t *pdp_temp,
181     unsigned long *skip_update,
182     int *schedule_smooth);
183
184 static int do_schedule_smooth(
185     rrd_t *rrd,
186     unsigned long rra_idx,
187     unsigned long elapsed_pdp_st);
188
189 static int update_cdp_prep(
190     rrd_t *rrd,
191     unsigned long elapsed_pdp_st,
192     unsigned long start_pdp_offset,
193     unsigned long *rra_step_cnt,
194     int rra_idx,
195     rrd_value_t *pdp_temp,
196     rrd_value_t *last_seasonal_coef,
197     rrd_value_t *seasonal_coef,
198     int current_cf);
199
200 static void update_cdp(
201     unival *scratch,
202     int current_cf,
203     rrd_value_t pdp_temp_val,
204     unsigned long rra_step_cnt,
205     unsigned long elapsed_pdp_st,
206     unsigned long start_pdp_offset,
207     unsigned long pdp_cnt,
208     rrd_value_t xff,
209     int i,
210     int ii);
211
212 static void initialize_cdp_val(
213     unival *scratch,
214     int current_cf,
215     rrd_value_t pdp_temp_val,
216     unsigned long elapsed_pdp_st,
217     unsigned long start_pdp_offset,
218     unsigned long pdp_cnt);
219
220 static void reset_cdp(
221     rrd_t *rrd,
222     unsigned long elapsed_pdp_st,
223     rrd_value_t *pdp_temp,
224     rrd_value_t *last_seasonal_coef,
225     rrd_value_t *seasonal_coef,
226     int rra_idx,
227     int ds_idx,
228     int cdp_idx,
229     enum cf_en current_cf);
230
231 static rrd_value_t initialize_average_carry_over(
232     rrd_value_t pdp_temp_val,
233     unsigned long elapsed_pdp_st,
234     unsigned long start_pdp_offset,
235     unsigned long pdp_cnt);
236
237 static rrd_value_t calculate_cdp_val(
238     rrd_value_t cdp_val,
239     rrd_value_t pdp_temp_val,
240     unsigned long elapsed_pdp_st,
241     int current_cf,
242     int i,
243     int ii);
244
245 static int update_aberrant_cdps(
246     rrd_t *rrd,
247     rrd_file_t *rrd_file,
248     unsigned long rra_begin,
249     unsigned long elapsed_pdp_st,
250     rrd_value_t *pdp_temp,
251     rrd_value_t **seasonal_coef);
252
253 static int write_to_rras(
254     rrd_t *rrd,
255     rrd_file_t *rrd_file,
256     unsigned long *rra_step_cnt,
257     unsigned long rra_begin,
258     time_t current_time,
259     unsigned long *skip_update,
260     rrd_info_t ** pcdp_summary);
261
262 static int write_RRA_row(
263     rrd_file_t *rrd_file,
264     rrd_t *rrd,
265     unsigned long rra_idx,
266     unsigned short CDP_scratch_idx,
267     rrd_info_t ** pcdp_summary,
268     time_t rra_time);
269
270 static int smooth_all_rras(
271     rrd_t *rrd,
272     rrd_file_t *rrd_file,
273     unsigned long rra_begin);
274
275 #ifndef HAVE_MMAP
276 static int write_changes_to_disk(
277     rrd_t *rrd,
278     rrd_file_t *rrd_file,
279     int version);
280 #endif
281
282 /*
283  * normalize time as returned by gettimeofday. usec part must
284  * be always >= 0
285  */
286 static inline void normalize_time(
287     struct timeval *t)
288 {
289     if (t->tv_usec < 0) {
290         t->tv_sec--;
291         t->tv_usec += 1e6L;
292     }
293 }
294
295 /*
296  * Sets current_time and current_time_usec based on the current time.
297  * current_time_usec is set to 0 if the version number is 1 or 2.
298  */
299 static inline void initialize_time(
300     time_t *current_time,
301     unsigned long *current_time_usec,
302     int version)
303 {
304     struct timeval tmp_time;    /* used for time conversion */
305
306     gettimeofday(&tmp_time, 0);
307     normalize_time(&tmp_time);
308     *current_time = tmp_time.tv_sec;
309     if (version >= 3) {
310         *current_time_usec = tmp_time.tv_usec;
311     } else {
312         *current_time_usec = 0;
313     }
314 }
315
316 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317
318 rrd_info_t *rrd_update_v(
319     int argc,
320     char **argv)
321 {
322     char     *tmplt = NULL;
323     rrd_info_t *result = NULL;
324     rrd_infoval_t rc;
325     char *opt_daemon = NULL;
326     struct option long_options[] = {
327         {"template", required_argument, 0, 't'},
328         {0, 0, 0, 0}
329     };
330
331     rc.u_int = -1;
332     optind = 0;
333     opterr = 0;         /* initialize getopt */
334
335     while (1) {
336         int       option_index = 0;
337         int       opt;
338
339         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
340
341         if (opt == EOF)
342             break;
343
344         switch (opt) {
345         case 't':
346             tmplt = optarg;
347             break;
348
349         case '?':
350             rrd_set_error("unknown option '%s'", argv[optind - 1]);
351             goto end_tag;
352         }
353     }
354
355     opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
356     if (opt_daemon != NULL) {
357         rrd_set_error ("The \"%s\" environment variable is defined, "
358                 "but \"%s\" cannot work with rrdcached. Either unset "
359                 "the environment variable or use \"update\" instead.",
360                 ENV_RRDCACHED_ADDRESS, argv[0]);
361         goto end_tag;
362     }
363
364     /* need at least 2 arguments: filename, data. */
365     if (argc - optind < 2) {
366         rrd_set_error("Not enough arguments");
367         goto end_tag;
368     }
369     rc.u_int = 0;
370     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
371     rc.u_int = _rrd_update(argv[optind], tmplt,
372                            argc - optind - 1,
373                            (const char **) (argv + optind + 1), result);
374     result->value.u_int = rc.u_int;
375   end_tag:
376     return result;
377 }
378
379 int rrd_update(
380     int argc,
381     char **argv)
382 {
383     struct option long_options[] = {
384         {"template", required_argument, 0, 't'},
385         {"daemon",   required_argument, 0, 'd'},
386         {0, 0, 0, 0}
387     };
388     int       option_index = 0;
389     int       opt;
390     char     *tmplt = NULL;
391     int       rc = -1;
392     char     *opt_daemon = NULL;
393
394     optind = 0;
395     opterr = 0;         /* initialize getopt */
396
397     while (1) {
398         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
399
400         if (opt == EOF)
401             break;
402
403         switch (opt) {
404         case 't':
405             tmplt = strdup(optarg);
406             break;
407
408         case 'd':
409             if (opt_daemon != NULL)
410                 free (opt_daemon);
411             opt_daemon = strdup (optarg);
412             if (opt_daemon == NULL)
413             {
414                 rrd_set_error("strdup failed.");
415                 goto out;
416             }
417             break;
418
419         case '?':
420             rrd_set_error("unknown option '%s'", argv[optind - 1]);
421             goto out;
422         }
423     }
424
425     /* need at least 2 arguments: filename, data. */
426     if (argc - optind < 2) {
427         rrd_set_error("Not enough arguments");
428         goto out;
429     }
430
431     {   /* try to connect to rrdcached */
432         int status = rrdc_connect(opt_daemon);
433         if (status != 0) return status;
434     }
435
436     if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
437     {
438         rrd_set_error("The caching daemon cannot be used together with "
439                 "templates yet.");
440         goto out;
441     }
442
443     if (! rrdc_is_connected(opt_daemon))
444     {
445       rc = rrd_update_r(argv[optind], tmplt,
446                         argc - optind - 1, (const char **) (argv + optind + 1));
447     }
448     else /* we are connected */
449     {
450         rc = rrdc_update (argv[optind], /* file */
451                           argc - optind - 1, /* values_num */
452                           (void *) (argv + optind + 1)); /* values */
453         if (rc != 0)
454         {
455             rrd_set_error("Failed sending the values to rrdcached: %s",
456                     (rc < 0)
457                     ? "Internal error"
458                     : rrd_strerror (rc));
459         }
460     }
461
462   out:
463     if (tmplt != NULL)
464     {
465         free(tmplt);
466         tmplt = NULL;
467     }
468     if (opt_daemon != NULL)
469     {
470         free (opt_daemon);
471         opt_daemon = NULL;
472     }
473     return rc;
474 }
475
476 int rrd_update_r(
477     const char *filename,
478     const char *tmplt,
479     int argc,
480     const char **argv)
481 {
482     return _rrd_update(filename, tmplt, argc, argv, NULL);
483 }
484
485 int _rrd_update(
486     const char *filename,
487     const char *tmplt,
488     int argc,
489     const char **argv,
490     rrd_info_t * pcdp_summary)
491 {
492
493     int       arg_i = 2;
494
495     unsigned long rra_begin;    /* byte pointer to the rra
496                                  * area in the rrd file.  this
497                                  * pointer never changes value */
498     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
499                              * to the existing entry */
500     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
501                              * to the cdp values */
502
503     long     *tmpl_idx; /* index representing the settings
504                          * transported by the tmplt index */
505     unsigned long tmpl_cnt = 2; /* time and data */
506     rrd_t     rrd;
507     time_t    current_time = 0;
508     unsigned long current_time_usec = 0;    /* microseconds part of current time */
509     char    **updvals;
510     int       schedule_smooth = 0;
511
512     /* number of elapsed PDP steps since last update */
513     unsigned long *rra_step_cnt = NULL;
514
515     int       version;  /* rrd version */
516     rrd_file_t *rrd_file;
517     char     *arg_copy; /* for processing the argv */
518     unsigned long *skip_update; /* RRAs to advance but not write */
519
520     /* need at least 1 arguments: data. */
521     if (argc < 1) {
522         rrd_set_error("Not enough arguments");
523         goto err_out;
524     }
525
526     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
527         goto err_free;
528     }
529     /* We are now at the beginning of the rra's */
530     rra_begin = rrd_file->header_len;
531
532     version = atoi(rrd.stat_head->version);
533
534     initialize_time(&current_time, &current_time_usec, version);
535
536     /* get exclusive lock to whole file.
537      * lock gets removed when we close the file.
538      */
539     if (rrd_lock(rrd_file) != 0) {
540         rrd_set_error("could not lock RRD");
541         goto err_close;
542     }
543
544     if (allocate_data_structures(&rrd, &updvals,
545                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
546                                  &rra_step_cnt, &skip_update,
547                                  &pdp_new) == -1) {
548         goto err_close;
549     }
550
551     /* loop through the arguments. */
552     for (arg_i = 0; arg_i < argc; arg_i++) {
553         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
554             rrd_set_error("failed duplication argv entry");
555             break;
556         }
557         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
558                         &current_time, &current_time_usec, pdp_temp, pdp_new,
559                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
560                         &pcdp_summary, version, skip_update,
561                         &schedule_smooth) == -1) {
562             if (rrd_test_error()) { /* Should have error string always here */
563                 char     *save_error;
564
565                 /* Prepend file name to error message */
566                 if ((save_error = strdup(rrd_get_error())) != NULL) {
567                     rrd_set_error("%s: %s", filename, save_error);
568                     free(save_error);
569                 }
570             }
571             free(arg_copy);
572             break;
573         }
574         free(arg_copy);
575     }
576
577     free(rra_step_cnt);
578
579     /* if we got here and if there is an error and if the file has not been
580      * written to, then close things up and return. */
581     if (rrd_test_error()) {
582         goto err_free_structures;
583     }
584 #ifndef HAVE_MMAP
585     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
586         goto err_free_structures;
587     }
588 #endif
589
590     /* calling the smoothing code here guarantees at most one smoothing
591      * operation per rrd_update call. Unfortunately, it is possible with bulk
592      * updates, or a long-delayed update for smoothing to occur off-schedule.
593      * This really isn't critical except during the burn-in cycles. */
594     if (schedule_smooth) {
595         smooth_all_rras(&rrd, rrd_file, rra_begin);
596     }
597
598 /*    rrd_dontneed(rrd_file,&rrd); */
599     rrd_free(&rrd);
600     rrd_close(rrd_file);
601
602     free(pdp_new);
603     free(tmpl_idx);
604     free(pdp_temp);
605     free(skip_update);
606     free(updvals);
607     return 0;
608
609   err_free_structures:
610     free(pdp_new);
611     free(tmpl_idx);
612     free(pdp_temp);
613     free(skip_update);
614     free(updvals);
615   err_close:
616     rrd_close(rrd_file);
617   err_free:
618     rrd_free(&rrd);
619   err_out:
620     return -1;
621 }
622
623 /*
624  * get exclusive lock to whole file.
625  * lock gets removed when we close the file
626  *
627  * returns 0 on success
628  */
629 int rrd_lock(
630     rrd_file_t *file)
631 {
632     int       rcstat;
633
634     {
635 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
636         struct _stat st;
637
638         if (_fstat(file->fd, &st) == 0) {
639             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
640         } else {
641             rcstat = -1;
642         }
643 #else
644         struct flock lock;
645
646         lock.l_type = F_WRLCK;  /* exclusive write lock */
647         lock.l_len = 0; /* whole file */
648         lock.l_start = 0;   /* start of file */
649         lock.l_whence = SEEK_SET;   /* end of file */
650
651         rcstat = fcntl(file->fd, F_SETLK, &lock);
652 #endif
653     }
654
655     return (rcstat);
656 }
657
658 /*
659  * Allocate some important arrays used, and initialize the template.
660  *
661  * When it returns, either all of the structures are allocated
662  * or none of them are.
663  *
664  * Returns 0 on success, -1 on error.
665  */
666 static int allocate_data_structures(
667     rrd_t *rrd,
668     char ***updvals,
669     rrd_value_t **pdp_temp,
670     const char *tmplt,
671     long **tmpl_idx,
672     unsigned long *tmpl_cnt,
673     unsigned long **rra_step_cnt,
674     unsigned long **skip_update,
675     rrd_value_t **pdp_new)
676 {
677     unsigned  i, ii;
678     if ((*updvals = (char **) malloc(sizeof(char *)
679                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
680         rrd_set_error("allocating updvals pointer array.");
681         return -1;
682     }
683     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
684                                             * rrd->stat_head->ds_cnt)) ==
685         NULL) {
686         rrd_set_error("allocating pdp_temp.");
687         goto err_free_updvals;
688     }
689     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
690                                                  *
691                                                  rrd->stat_head->rra_cnt)) ==
692         NULL) {
693         rrd_set_error("allocating skip_update.");
694         goto err_free_pdp_temp;
695     }
696     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
697                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
698         rrd_set_error("allocating tmpl_idx.");
699         goto err_free_skip_update;
700     }
701     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
702                                                   *
703                                                   (rrd->stat_head->
704                                                    rra_cnt))) == NULL) {
705         rrd_set_error("allocating rra_step_cnt.");
706         goto err_free_tmpl_idx;
707     }
708
709     /* initialize tmplt redirector */
710     /* default config example (assume DS 1 is a CDEF DS)
711        tmpl_idx[0] -> 0; (time)
712        tmpl_idx[1] -> 1; (DS 0)
713        tmpl_idx[2] -> 3; (DS 2)
714        tmpl_idx[3] -> 4; (DS 3) */
715     (*tmpl_idx)[0] = 0; /* time */
716     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
717         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
718             (*tmpl_idx)[ii++] = i;
719     }
720     *tmpl_cnt = ii;
721
722     if (tmplt != NULL) {
723         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
724             goto err_free_rra_step_cnt;
725         }
726     }
727
728     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
729                                            * rrd->stat_head->ds_cnt)) == NULL) {
730         rrd_set_error("allocating pdp_new.");
731         goto err_free_rra_step_cnt;
732     }
733
734     return 0;
735
736   err_free_rra_step_cnt:
737     free(*rra_step_cnt);
738   err_free_tmpl_idx:
739     free(*tmpl_idx);
740   err_free_skip_update:
741     free(*skip_update);
742   err_free_pdp_temp:
743     free(*pdp_temp);
744   err_free_updvals:
745     free(*updvals);
746     return -1;
747 }
748
749 /*
750  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
751  *
752  * Returns 0 on success.
753  */
754 static int parse_template(
755     rrd_t *rrd,
756     const char *tmplt,
757     unsigned long *tmpl_cnt,
758     long *tmpl_idx)
759 {
760     char     *dsname, *tmplt_copy;
761     unsigned int tmpl_len, i;
762     int       ret = 0;
763
764     *tmpl_cnt = 1;      /* the first entry is the time */
765
766     /* we should work on a writeable copy here */
767     if ((tmplt_copy = strdup(tmplt)) == NULL) {
768         rrd_set_error("error copying tmplt '%s'", tmplt);
769         ret = -1;
770         goto out;
771     }
772
773     dsname = tmplt_copy;
774     tmpl_len = strlen(tmplt_copy);
775     for (i = 0; i <= tmpl_len; i++) {
776         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
777             tmplt_copy[i] = '\0';
778             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
779                 rrd_set_error("tmplt contains more DS definitions than RRD");
780                 ret = -1;
781                 goto out_free_tmpl_copy;
782             }
783             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
784                 rrd_set_error("unknown DS name '%s'", dsname);
785                 ret = -1;
786                 goto out_free_tmpl_copy;
787             }
788             /* go to the next entry on the tmplt_copy */
789             if (i < tmpl_len)
790                 dsname = &tmplt_copy[i + 1];
791         }
792     }
793   out_free_tmpl_copy:
794     free(tmplt_copy);
795   out:
796     return ret;
797 }
798
799 /*
800  * Parse an update string, updates the primary data points (PDPs)
801  * and consolidated data points (CDPs), and writes changes to the RRAs.
802  *
803  * Returns 0 on success, -1 on error.
804  */
805 static int process_arg(
806     char *step_start,
807     rrd_t *rrd,
808     rrd_file_t *rrd_file,
809     unsigned long rra_begin,
810     time_t *current_time,
811     unsigned long *current_time_usec,
812     rrd_value_t *pdp_temp,
813     rrd_value_t *pdp_new,
814     unsigned long *rra_step_cnt,
815     char **updvals,
816     long *tmpl_idx,
817     unsigned long tmpl_cnt,
818     rrd_info_t ** pcdp_summary,
819     int version,
820     unsigned long *skip_update,
821     int *schedule_smooth)
822 {
823     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
824
825     /* a vector of future Holt-Winters seasonal coefs */
826     unsigned long elapsed_pdp_st;
827
828     double    interval, pre_int, post_int;  /* interval between this and
829                                              * the last run */
830     unsigned long proc_pdp_cnt;
831
832     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
833                  current_time, current_time_usec, version) == -1) {
834         return -1;
835     }
836
837     interval = (double) (*current_time - rrd->live_head->last_up)
838         + (double) ((long) *current_time_usec -
839                     (long) rrd->live_head->last_up_usec) / 1e6f;
840
841     /* process the data sources and update the pdp_prep 
842      * area accordingly */
843     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
844         return -1;
845     }
846
847     elapsed_pdp_st = calculate_elapsed_steps(rrd,
848                                              *current_time,
849                                              *current_time_usec, interval,
850                                              &pre_int, &post_int,
851                                              &proc_pdp_cnt);
852
853     /* has a pdp_st moment occurred since the last run ? */
854     if (elapsed_pdp_st == 0) {
855         /* no we have not passed a pdp_st moment. therefore update is simple */
856         simple_update(rrd, interval, pdp_new);
857     } else {
858         /* an pdp_st has occurred. */
859         if (process_all_pdp_st(rrd, interval,
860                                pre_int, post_int,
861                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
862             return -1;
863         }
864         if (update_all_cdp_prep(rrd, rra_step_cnt,
865                                 rra_begin, rrd_file,
866                                 elapsed_pdp_st,
867                                 proc_pdp_cnt,
868                                 &last_seasonal_coef,
869                                 &seasonal_coef,
870                                 pdp_temp,
871                                 skip_update, schedule_smooth) == -1) {
872             goto err_free_coefficients;
873         }
874         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
875                                  elapsed_pdp_st, pdp_temp,
876                                  &seasonal_coef) == -1) {
877             goto err_free_coefficients;
878         }
879         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
880                           *current_time, skip_update,
881                           pcdp_summary) == -1) {
882             goto err_free_coefficients;
883         }
884     }                   /* endif a pdp_st has occurred */
885     rrd->live_head->last_up = *current_time;
886     rrd->live_head->last_up_usec = *current_time_usec;
887
888     if (version < 3) {
889         *rrd->legacy_last_up = rrd->live_head->last_up;
890     }
891     free(seasonal_coef);
892     free(last_seasonal_coef);
893     return 0;
894
895   err_free_coefficients:
896     free(seasonal_coef);
897     free(last_seasonal_coef);
898     return -1;
899 }
900
901 /*
902  * Parse a DS string (time + colon-separated values), storing the
903  * results in current_time, current_time_usec, and updvals.
904  *
905  * Returns 0 on success, -1 on error.
906  */
907 static int parse_ds(
908     rrd_t *rrd,
909     char **updvals,
910     long *tmpl_idx,
911     char *input,
912     unsigned long tmpl_cnt,
913     time_t *current_time,
914     unsigned long *current_time_usec,
915     int version)
916 {
917     char     *p;
918     unsigned long i;
919     char      timesyntax;
920
921     updvals[0] = input;
922     /* initialize all ds input to unknown except the first one
923        which has always got to be set */
924     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
925         updvals[i] = "U";
926
927     /* separate all ds elements; first must be examined separately
928        due to alternate time syntax */
929     if ((p = strchr(input, '@')) != NULL) {
930         timesyntax = '@';
931     } else if ((p = strchr(input, ':')) != NULL) {
932         timesyntax = ':';
933     } else {
934         rrd_set_error("expected timestamp not found in data source from %s",
935                       input);
936         return -1;
937     }
938     *p = '\0';
939     i = 1;
940     updvals[tmpl_idx[i++]] = p + 1;
941     while (*(++p)) {
942         if (*p == ':') {
943             *p = '\0';
944             if (i < tmpl_cnt) {
945                 updvals[tmpl_idx[i++]] = p + 1;
946             }
947         }
948     }
949
950     if (i != tmpl_cnt) {
951         rrd_set_error("expected %lu data source readings (got %lu) from %s",
952                       tmpl_cnt - 1, i, input);
953         return -1;
954     }
955
956     if (get_time_from_reading(rrd, timesyntax, updvals,
957                               current_time, current_time_usec,
958                               version) == -1) {
959         return -1;
960     }
961     return 0;
962 }
963
964 /*
965  * Parse the time in a DS string, store it in current_time and 
966  * current_time_usec and verify that it's later than the last
967  * update for this DS.
968  *
969  * Returns 0 on success, -1 on error.
970  */
971 static int get_time_from_reading(
972     rrd_t *rrd,
973     char timesyntax,
974     char **updvals,
975     time_t *current_time,
976     unsigned long *current_time_usec,
977     int version)
978 {
979     double    tmp;
980     char     *parsetime_error = NULL;
981     char     *old_locale;
982     rrd_time_value_t ds_tv;
983     struct timeval tmp_time;    /* used for time conversion */
984
985     /* get the time from the reading ... handle N */
986     if (timesyntax == '@') {    /* at-style */
987         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
988             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
989             return -1;
990         }
991         if (ds_tv.type == RELATIVE_TO_END_TIME ||
992             ds_tv.type == RELATIVE_TO_START_TIME) {
993             rrd_set_error("specifying time relative to the 'start' "
994                           "or 'end' makes no sense here: %s", updvals[0]);
995             return -1;
996         }
997         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
998         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
999     } else if (strcmp(updvals[0], "N") == 0) {
1000         gettimeofday(&tmp_time, 0);
1001         normalize_time(&tmp_time);
1002         *current_time = tmp_time.tv_sec;
1003         *current_time_usec = tmp_time.tv_usec;
1004     } else {
1005         old_locale = setlocale(LC_NUMERIC, "C");
1006         tmp = strtod(updvals[0], 0);
1007         setlocale(LC_NUMERIC, old_locale);
1008         *current_time = floor(tmp);
1009         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1010     }
1011     /* dont do any correction for old version RRDs */
1012     if (version < 3)
1013         *current_time_usec = 0;
1014
1015     if (*current_time < rrd->live_head->last_up ||
1016         (*current_time == rrd->live_head->last_up &&
1017          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1018         rrd_set_error("illegal attempt to update using time %ld when "
1019                       "last update time is %ld (minimum one second step)",
1020                       *current_time, rrd->live_head->last_up);
1021         return -1;
1022     }
1023     return 0;
1024 }
1025
1026 /*
1027  * Update pdp_new by interpreting the updvals according to the DS type
1028  * (COUNTER, GAUGE, etc.).
1029  *
1030  * Returns 0 on success, -1 on error.
1031  */
1032 static int update_pdp_prep(
1033     rrd_t *rrd,
1034     char **updvals,
1035     rrd_value_t *pdp_new,
1036     double interval)
1037 {
1038     unsigned long ds_idx;
1039     int       ii;
1040     char     *endptr;   /* used in the conversion */
1041     double    rate;
1042     char     *old_locale;
1043     enum dst_en dst_idx;
1044
1045     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1046         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1047
1048         /* make sure we do not build diffs with old last_ds values */
1049         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1050             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1051             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1052         }
1053
1054         /* NOTE: DST_CDEF should never enter this if block, because
1055          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1056          * accidently specified a value for the DST_CDEF. To handle this case,
1057          * an extra check is required. */
1058
1059         if ((updvals[ds_idx + 1][0] != 'U') &&
1060             (dst_idx != DST_CDEF) &&
1061             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1062             rate = DNAN;
1063
1064             /* pdp_new contains rate * time ... eg the bytes transferred during
1065              * the interval. Doing it this way saves a lot of math operations
1066              */
1067             switch (dst_idx) {
1068             case DST_COUNTER:
1069             case DST_DERIVE:
1070                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1071                     if ((updvals[ds_idx + 1][ii] < '0'
1072                          || updvals[ds_idx + 1][ii] > '9')
1073                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1074                         rrd_set_error("not a simple integer: '%s'",
1075                                       updvals[ds_idx + 1]);
1076                         return -1;
1077                     }
1078                 }
1079                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1080                     pdp_new[ds_idx] =
1081                         rrd_diff(updvals[ds_idx + 1],
1082                                  rrd->pdp_prep[ds_idx].last_ds);
1083                     if (dst_idx == DST_COUNTER) {
1084                         /* simple overflow catcher. This will fail
1085                          * terribly for non 32 or 64 bit counters
1086                          * ... are there any others in SNMP land?
1087                          */
1088                         if (pdp_new[ds_idx] < (double) 0.0)
1089                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1090                         if (pdp_new[ds_idx] < (double) 0.0)
1091                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1092                     }
1093                     rate = pdp_new[ds_idx] / interval;
1094                 } else {
1095                     pdp_new[ds_idx] = DNAN;
1096                 }
1097                 break;
1098             case DST_ABSOLUTE:
1099                 old_locale = setlocale(LC_NUMERIC, "C");
1100                 errno = 0;
1101                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1102                 setlocale(LC_NUMERIC, old_locale);
1103                 if (errno > 0) {
1104                     rrd_set_error("converting '%s' to float: %s",
1105                                   updvals[ds_idx + 1], rrd_strerror(errno));
1106                     return -1;
1107                 };
1108                 if (endptr[0] != '\0') {
1109                     rrd_set_error
1110                         ("conversion of '%s' to float not complete: tail '%s'",
1111                          updvals[ds_idx + 1], endptr);
1112                     return -1;
1113                 }
1114                 rate = pdp_new[ds_idx] / interval;
1115                 break;
1116             case DST_GAUGE:
1117                 errno = 0;
1118                 old_locale = setlocale(LC_NUMERIC, "C");
1119                 pdp_new[ds_idx] =
1120                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1121                 setlocale(LC_NUMERIC, old_locale);
1122                 if (errno) {
1123                     rrd_set_error("converting '%s' to float: %s",
1124                                   updvals[ds_idx + 1], rrd_strerror(errno));
1125                     return -1;
1126                 };
1127                 if (endptr[0] != '\0') {
1128                     rrd_set_error
1129                         ("conversion of '%s' to float not complete: tail '%s'",
1130                          updvals[ds_idx + 1], endptr);
1131                     return -1;
1132                 }
1133                 rate = pdp_new[ds_idx] / interval;
1134                 break;
1135             default:
1136                 rrd_set_error("rrd contains unknown DS type : '%s'",
1137                               rrd->ds_def[ds_idx].dst);
1138                 return -1;
1139             }
1140             /* break out of this for loop if the error string is set */
1141             if (rrd_test_error()) {
1142                 return -1;
1143             }
1144             /* make sure pdp_temp is neither too large or too small
1145              * if any of these occur it becomes unknown ...
1146              * sorry folks ... */
1147             if (!isnan(rate) &&
1148                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1149                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1150                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1151                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1152                 pdp_new[ds_idx] = DNAN;
1153             }
1154         } else {
1155             /* no news is news all the same */
1156             pdp_new[ds_idx] = DNAN;
1157         }
1158
1159
1160         /* make a copy of the command line argument for the next run */
1161 #ifdef DEBUG
1162         fprintf(stderr, "prep ds[%lu]\t"
1163                 "last_arg '%s'\t"
1164                 "this_arg '%s'\t"
1165                 "pdp_new %10.2f\n",
1166                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1167                 pdp_new[ds_idx]);
1168 #endif
1169         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1170                 LAST_DS_LEN - 1);
1171         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1172     }
1173     return 0;
1174 }
1175
1176 /*
1177  * How many PDP steps have elapsed since the last update? Returns the answer,
1178  * and stores the time between the last update and the last PDP in pre_time,
1179  * and the time between the last PDP and the current time in post_int.
1180  */
1181 static int calculate_elapsed_steps(
1182     rrd_t *rrd,
1183     unsigned long current_time,
1184     unsigned long current_time_usec,
1185     double interval,
1186     double *pre_int,
1187     double *post_int,
1188     unsigned long *proc_pdp_cnt)
1189 {
1190     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1191     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1192                                  * time */
1193     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1194                                  * when it was last updated */
1195     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1196
1197     /* when was the current pdp started */
1198     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1199     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1200
1201     /* when did the last pdp_st occur */
1202     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1203     occu_pdp_st = current_time - occu_pdp_age;
1204
1205     if (occu_pdp_st > proc_pdp_st) {
1206         /* OK we passed the pdp_st moment */
1207         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1208                                                                      * occurred before the latest
1209                                                                      * pdp_st moment*/
1210         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1211         *post_int = occu_pdp_age;   /* how much after it */
1212         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1213     } else {
1214         *pre_int = interval;
1215         *post_int = 0;
1216     }
1217
1218     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1219
1220 #ifdef DEBUG
1221     printf("proc_pdp_age %lu\t"
1222            "proc_pdp_st %lu\t"
1223            "occu_pfp_age %lu\t"
1224            "occu_pdp_st %lu\t"
1225            "int %lf\t"
1226            "pre_int %lf\t"
1227            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1228            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1229 #endif
1230
1231     /* compute the number of elapsed pdp_st moments */
1232     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1233 }
1234
1235 /*
1236  * Increment the PDP values by the values in pdp_new, or else initialize them.
1237  */
1238 static void simple_update(
1239     rrd_t *rrd,
1240     double interval,
1241     rrd_value_t *pdp_new)
1242 {
1243     int       i;
1244
1245     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1246         if (isnan(pdp_new[i])) {
1247             /* this is not really accurate if we use subsecond data arrival time
1248                should have thought of it when going subsecond resolution ...
1249                sorry next format change we will have it! */
1250             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1251                 floor(interval);
1252         } else {
1253             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1254                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1255             } else {
1256                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1257             }
1258         }
1259 #ifdef DEBUG
1260         fprintf(stderr,
1261                 "NO PDP  ds[%i]\t"
1262                 "value %10.2f\t"
1263                 "unkn_sec %5lu\n",
1264                 i,
1265                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1266                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1267 #endif
1268     }
1269 }
1270
1271 /*
1272  * Call process_pdp_st for each DS.
1273  *
1274  * Returns 0 on success, -1 on error.
1275  */
1276 static int process_all_pdp_st(
1277     rrd_t *rrd,
1278     double interval,
1279     double pre_int,
1280     double post_int,
1281     unsigned long elapsed_pdp_st,
1282     rrd_value_t *pdp_new,
1283     rrd_value_t *pdp_temp)
1284 {
1285     unsigned long ds_idx;
1286
1287     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1288        rate*seconds which occurred up to the last run.
1289        pdp_new[] contains rate*seconds from the latest run.
1290        pdp_temp[] will contain the rate for cdp */
1291
1292     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1293         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1294                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1295                            pdp_new, pdp_temp) == -1) {
1296             return -1;
1297         }
1298 #ifdef DEBUG
1299         fprintf(stderr, "PDP UPD ds[%lu]\t"
1300                 "elapsed_pdp_st %lu\t"
1301                 "pdp_temp %10.2f\t"
1302                 "new_prep %10.2f\t"
1303                 "new_unkn_sec %5lu\n",
1304                 ds_idx,
1305                 elapsed_pdp_st,
1306                 pdp_temp[ds_idx],
1307                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1308                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1309 #endif
1310     }
1311     return 0;
1312 }
1313
1314 /*
1315  * Process an update that occurs after one of the PDP moments.
1316  * Increments the PDP value, sets NAN if time greater than the
1317  * heartbeats have elapsed, processes CDEFs.
1318  *
1319  * Returns 0 on success, -1 on error.
1320  */
1321 static int process_pdp_st(
1322     rrd_t *rrd,
1323     unsigned long ds_idx,
1324     double interval,
1325     double pre_int,
1326     double post_int,
1327     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1328     rrd_value_t *pdp_new,
1329     rrd_value_t *pdp_temp)
1330 {
1331     int       i;
1332
1333     /* update pdp_prep to the current pdp_st. */
1334     double    pre_unknown = 0.0;
1335     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1336     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1337
1338     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1339
1340     rpnstack_init(&rpnstack);
1341
1342
1343     if (isnan(pdp_new[ds_idx])) {
1344         /* a final bit of unknown to be added before calculation
1345            we use a temporary variable for this so that we
1346            don't have to turn integer lines before using the value */
1347         pre_unknown = pre_int;
1348     } else {
1349         if (isnan(scratch[PDP_val].u_val)) {
1350             scratch[PDP_val].u_val = 0;
1351         }
1352         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1353     }
1354
1355     /* if too much of the pdp_prep is unknown we dump it */
1356     /* if the interval is larger thatn mrhb we get NAN */
1357     if ((interval > mrhb) ||
1358         (rrd->stat_head->pdp_step / 2.0 <
1359          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1360         pdp_temp[ds_idx] = DNAN;
1361     } else {
1362         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1363             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1364              pre_unknown);
1365     }
1366
1367     /* process CDEF data sources; remember each CDEF DS can
1368      * only reference other DS with a lower index number */
1369     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1370         rpnp_t   *rpnp;
1371
1372         rpnp =
1373             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1374         /* substitute data values for OP_VARIABLE nodes */
1375         for (i = 0; rpnp[i].op != OP_END; i++) {
1376             if (rpnp[i].op == OP_VARIABLE) {
1377                 rpnp[i].op = OP_NUMBER;
1378                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1379             }
1380         }
1381         /* run the rpn calculator */
1382         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1383             free(rpnp);
1384             rpnstack_free(&rpnstack);
1385             return -1;
1386         }
1387     }
1388
1389     /* make pdp_prep ready for the next run */
1390     if (isnan(pdp_new[ds_idx])) {
1391         /* this is not realy accurate if we use subsecond data arival time
1392            should have thought of it when going subsecond resolution ...
1393            sorry next format change we will have it! */
1394         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1395         scratch[PDP_val].u_val = DNAN;
1396     } else {
1397         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1398         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1399     }
1400     rpnstack_free(&rpnstack);
1401     return 0;
1402 }
1403
1404 /*
1405  * Iterate over all the RRAs for a given DS and:
1406  * 1. Decide whether to schedule a smooth later
1407  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1408  * 3. Update the CDP
1409  *
1410  * Returns 0 on success, -1 on error
1411  */
1412 static int update_all_cdp_prep(
1413     rrd_t *rrd,
1414     unsigned long *rra_step_cnt,
1415     unsigned long rra_begin,
1416     rrd_file_t *rrd_file,
1417     unsigned long elapsed_pdp_st,
1418     unsigned long proc_pdp_cnt,
1419     rrd_value_t **last_seasonal_coef,
1420     rrd_value_t **seasonal_coef,
1421     rrd_value_t *pdp_temp,
1422     unsigned long *skip_update,
1423     int *schedule_smooth)
1424 {
1425     unsigned long rra_idx;
1426
1427     /* index into the CDP scratch array */
1428     enum cf_en current_cf;
1429     unsigned long rra_start;
1430
1431     /* number of rows to be updated in an RRA for a data value. */
1432     unsigned long start_pdp_offset;
1433
1434     rra_start = rra_begin;
1435     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1436         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1437         start_pdp_offset =
1438             rrd->rra_def[rra_idx].pdp_cnt -
1439             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1440         skip_update[rra_idx] = 0;
1441         if (start_pdp_offset <= elapsed_pdp_st) {
1442             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1443                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1444         } else {
1445             rra_step_cnt[rra_idx] = 0;
1446         }
1447
1448         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1449             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1450              * so that they will be correct for the next observed value; note that for
1451              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1452              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1453             if (rra_step_cnt[rra_idx] > 1) {
1454                 skip_update[rra_idx] = 1;
1455                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1456                                 elapsed_pdp_st, last_seasonal_coef);
1457                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1458                                 elapsed_pdp_st + 1, seasonal_coef);
1459             }
1460             /* periodically run a smoother for seasonal effects */
1461             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1462 #ifdef DEBUG
1463                 fprintf(stderr,
1464                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1465                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1466                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1467                         u_cnt);
1468 #endif
1469                 *schedule_smooth = 1;
1470             }
1471         }
1472         if (rrd_test_error())
1473             return -1;
1474
1475         if (update_cdp_prep
1476             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1477              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1478              current_cf) == -1) {
1479             return -1;
1480         }
1481         rra_start +=
1482             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1483             sizeof(rrd_value_t);
1484     }
1485     return 0;
1486 }
1487
1488 /* 
1489  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1490  */
1491 static int do_schedule_smooth(
1492     rrd_t *rrd,
1493     unsigned long rra_idx,
1494     unsigned long elapsed_pdp_st)
1495 {
1496     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1497     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1498     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1499     unsigned long seasonal_smooth_idx =
1500         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1501     unsigned long *init_seasonal =
1502         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1503
1504     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1505      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1506      * really an RRA level, not a data source within RRA level parameter, but
1507      * the rra_def is read only for rrd_update (not flushed to disk). */
1508     if (*init_seasonal > BURNIN_CYCLES) {
1509         /* someone has no doubt invented a trick to deal with this wrap around,
1510          * but at least this code is clear. */
1511         if (seasonal_smooth_idx > cur_row) {
1512             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1513              * between PDP and CDP */
1514             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1515         }
1516         /* can't rely on negative numbers because we are working with
1517          * unsigned values */
1518         return (cur_row + elapsed_pdp_st >= row_cnt
1519                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1520     }
1521     /* mark off one of the burn-in cycles */
1522     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1523 }
1524
1525 /*
1526  * For a given RRA, iterate over the data sources and call the appropriate
1527  * consolidation function.
1528  *
1529  * Returns 0 on success, -1 on error.
1530  */
1531 static int update_cdp_prep(
1532     rrd_t *rrd,
1533     unsigned long elapsed_pdp_st,
1534     unsigned long start_pdp_offset,
1535     unsigned long *rra_step_cnt,
1536     int rra_idx,
1537     rrd_value_t *pdp_temp,
1538     rrd_value_t *last_seasonal_coef,
1539     rrd_value_t *seasonal_coef,
1540     int current_cf)
1541 {
1542     unsigned long ds_idx, cdp_idx;
1543
1544     /* update CDP_PREP areas */
1545     /* loop over data soures within each RRA */
1546     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1547
1548         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1549
1550         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1551             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1552                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1553                        elapsed_pdp_st, start_pdp_offset,
1554                        rrd->rra_def[rra_idx].pdp_cnt,
1555                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1556                        rra_idx, ds_idx);
1557         } else {
1558             /* Nothing to consolidate if there's one PDP per CDP. However, if
1559              * we've missed some PDPs, let's update null counters etc. */
1560             if (elapsed_pdp_st > 2) {
1561                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1562                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1563                           current_cf);
1564             }
1565         }
1566
1567         if (rrd_test_error())
1568             return -1;
1569     }                   /* endif data sources loop */
1570     return 0;
1571 }
1572
1573 /*
1574  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1575  * primary value, secondary value, and # of unknowns.
1576  */
1577 static void update_cdp(
1578     unival *scratch,
1579     int current_cf,
1580     rrd_value_t pdp_temp_val,
1581     unsigned long rra_step_cnt,
1582     unsigned long elapsed_pdp_st,
1583     unsigned long start_pdp_offset,
1584     unsigned long pdp_cnt,
1585     rrd_value_t xff,
1586     int i,
1587     int ii)
1588 {
1589     /* shorthand variables */
1590     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1591     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1592     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1593     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1594
1595     if (rra_step_cnt) {
1596         /* If we are in this block, as least 1 CDP value will be written to
1597          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1598          * to be written, then the "fill in" value is the CDP_secondary_val
1599          * entry. */
1600         if (isnan(pdp_temp_val)) {
1601             *cdp_unkn_pdp_cnt += start_pdp_offset;
1602             *cdp_secondary_val = DNAN;
1603         } else {
1604             /* CDP_secondary value is the RRA "fill in" value for intermediary
1605              * CDP data entries. No matter the CF, the value is the same because
1606              * the average, max, min, and last of a list of identical values is
1607              * the same, namely, the value itself. */
1608             *cdp_secondary_val = pdp_temp_val;
1609         }
1610
1611         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1612             *cdp_primary_val = DNAN;
1613             if (current_cf == CF_AVERAGE) {
1614                 *cdp_val =
1615                     initialize_average_carry_over(pdp_temp_val,
1616                                                   elapsed_pdp_st,
1617                                                   start_pdp_offset, pdp_cnt);
1618             } else {
1619                 *cdp_val = pdp_temp_val;
1620             }
1621         } else {
1622             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1623                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1624         }               /* endif meets xff value requirement for a valid value */
1625         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1626          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1627         if (isnan(pdp_temp_val))
1628             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1629         else
1630             *cdp_unkn_pdp_cnt = 0;
1631     } else {            /* rra_step_cnt[i]  == 0 */
1632
1633 #ifdef DEBUG
1634         if (isnan(*cdp_val)) {
1635             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1636                     i, ii);
1637         } else {
1638             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1639                     i, ii, *cdp_val);
1640         }
1641 #endif
1642         if (isnan(pdp_temp_val)) {
1643             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1644         } else {
1645             *cdp_val =
1646                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1647                                   current_cf, i, ii);
1648         }
1649     }
1650 }
1651
1652 /*
1653  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1654  * on the type of consolidation function.
1655  */
1656 static void initialize_cdp_val(
1657     unival *scratch,
1658     int current_cf,
1659     rrd_value_t pdp_temp_val,
1660     unsigned long elapsed_pdp_st,
1661     unsigned long start_pdp_offset,
1662     unsigned long pdp_cnt)
1663 {
1664     rrd_value_t cum_val, cur_val;
1665
1666     switch (current_cf) {
1667     case CF_AVERAGE:
1668         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1669         cur_val = IFDNAN(pdp_temp_val, 0.0);
1670         scratch[CDP_primary_val].u_val =
1671             (cum_val + cur_val * start_pdp_offset) /
1672             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1673         scratch[CDP_val].u_val =
1674             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1675                                           start_pdp_offset, pdp_cnt);
1676         break;
1677     case CF_MAXIMUM:
1678         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1679         cur_val = IFDNAN(pdp_temp_val, -DINF);
1680 #if 0
1681 #ifdef DEBUG
1682         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1683             fprintf(stderr,
1684                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1685                     i, ii);
1686             exit(-1);
1687         }
1688 #endif
1689 #endif
1690         if (cur_val > cum_val)
1691             scratch[CDP_primary_val].u_val = cur_val;
1692         else
1693             scratch[CDP_primary_val].u_val = cum_val;
1694         /* initialize carry over value */
1695         scratch[CDP_val].u_val = pdp_temp_val;
1696         break;
1697     case CF_MINIMUM:
1698         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1699         cur_val = IFDNAN(pdp_temp_val, DINF);
1700 #if 0
1701 #ifdef DEBUG
1702         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1703             fprintf(stderr,
1704                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1705                     ii);
1706             exit(-1);
1707         }
1708 #endif
1709 #endif
1710         if (cur_val < cum_val)
1711             scratch[CDP_primary_val].u_val = cur_val;
1712         else
1713             scratch[CDP_primary_val].u_val = cum_val;
1714         /* initialize carry over value */
1715         scratch[CDP_val].u_val = pdp_temp_val;
1716         break;
1717     case CF_LAST:
1718     default:
1719         scratch[CDP_primary_val].u_val = pdp_temp_val;
1720         /* initialize carry over value */
1721         scratch[CDP_val].u_val = pdp_temp_val;
1722         break;
1723     }
1724 }
1725
1726 /*
1727  * Update the consolidation function for Holt-Winters functions as
1728  * well as other functions that don't actually consolidate multiple
1729  * PDPs.
1730  */
1731 static void reset_cdp(
1732     rrd_t *rrd,
1733     unsigned long elapsed_pdp_st,
1734     rrd_value_t *pdp_temp,
1735     rrd_value_t *last_seasonal_coef,
1736     rrd_value_t *seasonal_coef,
1737     int rra_idx,
1738     int ds_idx,
1739     int cdp_idx,
1740     enum cf_en current_cf)
1741 {
1742     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1743
1744     switch (current_cf) {
1745     case CF_AVERAGE:
1746     default:
1747         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1748         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1749         break;
1750     case CF_SEASONAL:
1751     case CF_DEVSEASONAL:
1752         /* need to update cached seasonal values, so they are consistent
1753          * with the bulk update */
1754         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1755          * CDP_last_deviation are the same. */
1756         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1757         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1758         break;
1759     case CF_HWPREDICT:
1760     case CF_MHWPREDICT:
1761         /* need to update the null_count and last_null_count.
1762          * even do this for non-DNAN pdp_temp because the
1763          * algorithm is not learning from batch updates. */
1764         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1765         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1766         /* fall through */
1767     case CF_DEVPREDICT:
1768         scratch[CDP_primary_val].u_val = DNAN;
1769         scratch[CDP_secondary_val].u_val = DNAN;
1770         break;
1771     case CF_FAILURES:
1772         /* do not count missed bulk values as failures */
1773         scratch[CDP_primary_val].u_val = 0;
1774         scratch[CDP_secondary_val].u_val = 0;
1775         /* need to reset violations buffer.
1776          * could do this more carefully, but for now, just
1777          * assume a bulk update wipes away all violations. */
1778         erase_violations(rrd, cdp_idx, rra_idx);
1779         break;
1780     }
1781 }
1782
1783 static rrd_value_t initialize_average_carry_over(
1784     rrd_value_t pdp_temp_val,
1785     unsigned long elapsed_pdp_st,
1786     unsigned long start_pdp_offset,
1787     unsigned long pdp_cnt)
1788 {
1789     /* initialize carry over value */
1790     if (isnan(pdp_temp_val)) {
1791         return DNAN;
1792     }
1793     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1794 }
1795
1796 /*
1797  * Update or initialize a CDP value based on the consolidation
1798  * function.
1799  *
1800  * Returns the new value.
1801  */
1802 static rrd_value_t calculate_cdp_val(
1803     rrd_value_t cdp_val,
1804     rrd_value_t pdp_temp_val,
1805     unsigned long elapsed_pdp_st,
1806     int current_cf,
1807 #ifdef DEBUG
1808     int i,
1809     int ii
1810 #else
1811     int UNUSED(i),
1812     int UNUSED(ii)
1813 #endif
1814     )
1815 {
1816     if (isnan(cdp_val)) {
1817         if (current_cf == CF_AVERAGE) {
1818             pdp_temp_val *= elapsed_pdp_st;
1819         }
1820 #ifdef DEBUG
1821         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1822                 i, ii, pdp_temp_val);
1823 #endif
1824         return pdp_temp_val;
1825     }
1826     if (current_cf == CF_AVERAGE)
1827         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1828     if (current_cf == CF_MINIMUM)
1829         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1830     if (current_cf == CF_MAXIMUM)
1831         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1832
1833     return pdp_temp_val;
1834 }
1835
1836 /*
1837  * For each RRA, update the seasonal values and then call update_aberrant_CF
1838  * for each data source.
1839  *
1840  * Return 0 on success, -1 on error.
1841  */
1842 static int update_aberrant_cdps(
1843     rrd_t *rrd,
1844     rrd_file_t *rrd_file,
1845     unsigned long rra_begin,
1846     unsigned long elapsed_pdp_st,
1847     rrd_value_t *pdp_temp,
1848     rrd_value_t **seasonal_coef)
1849 {
1850     unsigned long rra_idx, ds_idx, j;
1851
1852     /* number of PDP steps since the last update that
1853      * are assigned to the first CDP to be generated
1854      * since the last update. */
1855     unsigned short scratch_idx;
1856     unsigned long rra_start;
1857     enum cf_en current_cf;
1858
1859     /* this loop is only entered if elapsed_pdp_st < 3 */
1860     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1861          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1862         rra_start = rra_begin;
1863         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1864             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1865                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1866                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1867                     if (scratch_idx == CDP_primary_val) {
1868                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1869                                         elapsed_pdp_st + 1, seasonal_coef);
1870                     } else {
1871                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1872                                         elapsed_pdp_st + 2, seasonal_coef);
1873                     }
1874                 }
1875                 if (rrd_test_error())
1876                     return -1;
1877                 /* loop over data soures within each RRA */
1878                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1879                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1880                                        rra_idx * (rrd->stat_head->ds_cnt) +
1881                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1882                                        *seasonal_coef);
1883                 }
1884             }
1885             rra_start += rrd->rra_def[rra_idx].row_cnt
1886                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1887         }
1888     }
1889     return 0;
1890 }
1891
1892 /* 
1893  * Move sequentially through the file, writing one RRA at a time.  Note this
1894  * architecture divorces the computation of CDP with flushing updated RRA
1895  * entries to disk.
1896  *
1897  * Return 0 on success, -1 on error.
1898  */
1899 static int write_to_rras(
1900     rrd_t *rrd,
1901     rrd_file_t *rrd_file,
1902     unsigned long *rra_step_cnt,
1903     unsigned long rra_begin,
1904     time_t current_time,
1905     unsigned long *skip_update,
1906     rrd_info_t ** pcdp_summary)
1907 {
1908     unsigned long rra_idx;
1909     unsigned long rra_start;
1910     time_t    rra_time = 0; /* time of update for a RRA */
1911
1912     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1913     
1914     /* Ready to write to disk */
1915     rra_start = rra_begin;
1916
1917     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1918         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1919         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1920
1921         /* for cdp_prep */
1922         unsigned short scratch_idx;
1923         unsigned long step_subtract;
1924
1925         for (scratch_idx = CDP_primary_val,
1926                  step_subtract = 1;
1927              rra_step_cnt[rra_idx] > 0;
1928              rra_step_cnt[rra_idx]--,
1929                  scratch_idx = CDP_secondary_val,
1930                  step_subtract = 2) {
1931
1932             off_t rra_pos_new;
1933 #ifdef DEBUG
1934             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1935 #endif
1936             /* increment, with wrap-around */
1937             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1938               rra_ptr->cur_row = 0;
1939
1940             /* we know what our position should be */
1941             rra_pos_new = rra_start
1942               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1943
1944             /* re-seek if the position is wrong or we wrapped around */
1945             if (rra_pos_new != rrd_file->pos) {
1946                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1947                     rrd_set_error("seek error in rrd");
1948                     return -1;
1949                 }
1950             }
1951 #ifdef DEBUG
1952             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1953 #endif
1954
1955             if (skip_update[rra_idx])
1956                 continue;
1957
1958             if (*pcdp_summary != NULL) {
1959                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1960
1961                 rra_time = (current_time - current_time % step_time)
1962                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1963             }
1964
1965             if (write_RRA_row
1966                 (rrd_file, rrd, rra_idx, scratch_idx,
1967                  pcdp_summary, rra_time) == -1)
1968                 return -1;
1969         }
1970
1971         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1972     } /* RRA LOOP */
1973
1974     return 0;
1975 }
1976
1977 /*
1978  * Write out one row of values (one value per DS) to the archive.
1979  *
1980  * Returns 0 on success, -1 on error.
1981  */
1982 static int write_RRA_row(
1983     rrd_file_t *rrd_file,
1984     rrd_t *rrd,
1985     unsigned long rra_idx,
1986     unsigned short CDP_scratch_idx,
1987     rrd_info_t ** pcdp_summary,
1988     time_t rra_time)
1989 {
1990     unsigned long ds_idx, cdp_idx;
1991     rrd_infoval_t iv;
1992
1993     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1994         /* compute the cdp index */
1995         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1996 #ifdef DEBUG
1997         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1998                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1999                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2000 #endif
2001         if (*pcdp_summary != NULL) {
2002             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2003             /* append info to the return hash */
2004             *pcdp_summary = rrd_info_push(*pcdp_summary,
2005                                           sprintf_alloc
2006                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2007                                            rrd->rra_def[rra_idx].cf_nam,
2008                                            rrd->rra_def[rra_idx].pdp_cnt,
2009                                            rrd->ds_def[ds_idx].ds_nam),
2010                                           RD_I_VAL, iv);
2011         }
2012         if (rrd_write(rrd_file,
2013                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2014                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2015             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2016             return -1;
2017         }
2018     }
2019     return 0;
2020 }
2021
2022 /*
2023  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2024  *
2025  * Returns 0 on success, -1 otherwise
2026  */
2027 static int smooth_all_rras(
2028     rrd_t *rrd,
2029     rrd_file_t *rrd_file,
2030     unsigned long rra_begin)
2031 {
2032     unsigned long rra_start = rra_begin;
2033     unsigned long rra_idx;
2034
2035     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2036         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2037             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2038 #ifdef DEBUG
2039             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2040 #endif
2041             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2042             if (rrd_test_error())
2043                 return -1;
2044         }
2045         rra_start += rrd->rra_def[rra_idx].row_cnt
2046             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2047     }
2048     return 0;
2049 }
2050
2051 #ifndef HAVE_MMAP
2052 /*
2053  * Flush changes to disk (unless we're using mmap)
2054  *
2055  * Returns 0 on success, -1 otherwise
2056  */
2057 static int write_changes_to_disk(
2058     rrd_t *rrd,
2059     rrd_file_t *rrd_file,
2060     int version)
2061 {
2062     /* we just need to write back the live header portion now */
2063     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2064                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2065                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2066                  SEEK_SET) != 0) {
2067         rrd_set_error("seek rrd for live header writeback");
2068         return -1;
2069     }
2070     if (version >= 3) {
2071         if (rrd_write(rrd_file, rrd->live_head,
2072                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2073             rrd_set_error("rrd_write live_head to rrd");
2074             return -1;
2075         }
2076     } else {
2077         if (rrd_write(rrd_file, rrd->legacy_last_up,
2078                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2079             rrd_set_error("rrd_write live_head to rrd");
2080             return -1;
2081         }
2082     }
2083
2084
2085     if (rrd_write(rrd_file, rrd->pdp_prep,
2086                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2087         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2088         rrd_set_error("rrd_write pdp_prep to rrd");
2089         return -1;
2090     }
2091
2092     if (rrd_write(rrd_file, rrd->cdp_prep,
2093                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2094                   rrd->stat_head->ds_cnt)
2095         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2096                       rrd->stat_head->ds_cnt)) {
2097
2098         rrd_set_error("rrd_write cdp_prep to rrd");
2099         return -1;
2100     }
2101
2102     if (rrd_write(rrd_file, rrd->rra_ptr,
2103                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2104         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2105         rrd_set_error("rrd_write rra_ptr to rrd");
2106         return -1;
2107     }
2108     return 0;
2109 }
2110 #endif