Merge branch 'master' into ff/rrdd
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
4  *                Copyright by Florian Forster, 2008
5  *****************************************************************************
6  * rrd_update.c  RRD Update Function
7  *****************************************************************************
8  * $Id$
9  *****************************************************************************/
10
11 #include "rrd_tool.h"
12
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
18
19 #include <locale.h>
20
21 #include "rrd_hw.h"
22 #include "rrd_rpncalc.h"
23
24 #include "rrd_is_thread_safe.h"
25 #include "unused.h"
26
27 #include "rrd_client.h"
28
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 /*
31  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
32  * replacement.
33  */
34 #include <sys/timeb.h>
35
36 #ifndef __MINGW32__
37 struct timeval {
38     time_t    tv_sec;   /* seconds */
39     long      tv_usec;  /* microseconds */
40 };
41 #endif
42
43 struct __timezone {
44     int       tz_minuteswest;   /* minutes W of Greenwich */
45     int       tz_dsttime;   /* type of dst correction */
46 };
47
48 static int gettimeofday(
49     struct timeval *t,
50     struct __timezone *tz)
51 {
52
53     struct _timeb current_time;
54
55     _ftime(&current_time);
56
57     t->tv_sec = current_time.time;
58     t->tv_usec = current_time.millitm * 1000;
59
60     return 0;
61 }
62
63 #endif
64
65 /* FUNCTION PROTOTYPES */
66
67 int       rrd_update_r(
68     const char *filename,
69     const char *tmplt,
70     int argc,
71     const char **argv);
72 int       _rrd_update(
73     const char *filename,
74     const char *tmplt,
75     int argc,
76     const char **argv,
77     rrd_info_t *);
78
79 static int allocate_data_structures(
80     rrd_t *rrd,
81     char ***updvals,
82     rrd_value_t **pdp_temp,
83     const char *tmplt,
84     long **tmpl_idx,
85     unsigned long *tmpl_cnt,
86     unsigned long **rra_step_cnt,
87     unsigned long **skip_update,
88     rrd_value_t **pdp_new);
89
90 static int parse_template(
91     rrd_t *rrd,
92     const char *tmplt,
93     unsigned long *tmpl_cnt,
94     long *tmpl_idx);
95
96 static int process_arg(
97     char *step_start,
98     rrd_t *rrd,
99     rrd_file_t *rrd_file,
100     unsigned long rra_begin,
101     time_t *current_time,
102     unsigned long *current_time_usec,
103     rrd_value_t *pdp_temp,
104     rrd_value_t *pdp_new,
105     unsigned long *rra_step_cnt,
106     char **updvals,
107     long *tmpl_idx,
108     unsigned long tmpl_cnt,
109     rrd_info_t ** pcdp_summary,
110     int version,
111     unsigned long *skip_update,
112     int *schedule_smooth);
113
114 static int parse_ds(
115     rrd_t *rrd,
116     char **updvals,
117     long *tmpl_idx,
118     char *input,
119     unsigned long tmpl_cnt,
120     time_t *current_time,
121     unsigned long *current_time_usec,
122     int version);
123
124 static int get_time_from_reading(
125     rrd_t *rrd,
126     char timesyntax,
127     char **updvals,
128     time_t *current_time,
129     unsigned long *current_time_usec,
130     int version);
131
132 static int update_pdp_prep(
133     rrd_t *rrd,
134     char **updvals,
135     rrd_value_t *pdp_new,
136     double interval);
137
138 static int calculate_elapsed_steps(
139     rrd_t *rrd,
140     unsigned long current_time,
141     unsigned long current_time_usec,
142     double interval,
143     double *pre_int,
144     double *post_int,
145     unsigned long *proc_pdp_cnt);
146
147 static void simple_update(
148     rrd_t *rrd,
149     double interval,
150     rrd_value_t *pdp_new);
151
152 static int process_all_pdp_st(
153     rrd_t *rrd,
154     double interval,
155     double pre_int,
156     double post_int,
157     unsigned long elapsed_pdp_st,
158     rrd_value_t *pdp_new,
159     rrd_value_t *pdp_temp);
160
161 static int process_pdp_st(
162     rrd_t *rrd,
163     unsigned long ds_idx,
164     double interval,
165     double pre_int,
166     double post_int,
167     long diff_pdp_st,
168     rrd_value_t *pdp_new,
169     rrd_value_t *pdp_temp);
170
171 static int update_all_cdp_prep(
172     rrd_t *rrd,
173     unsigned long *rra_step_cnt,
174     unsigned long rra_begin,
175     rrd_file_t *rrd_file,
176     unsigned long elapsed_pdp_st,
177     unsigned long proc_pdp_cnt,
178     rrd_value_t **last_seasonal_coef,
179     rrd_value_t **seasonal_coef,
180     rrd_value_t *pdp_temp,
181     unsigned long *skip_update,
182     int *schedule_smooth);
183
184 static int do_schedule_smooth(
185     rrd_t *rrd,
186     unsigned long rra_idx,
187     unsigned long elapsed_pdp_st);
188
189 static int update_cdp_prep(
190     rrd_t *rrd,
191     unsigned long elapsed_pdp_st,
192     unsigned long start_pdp_offset,
193     unsigned long *rra_step_cnt,
194     int rra_idx,
195     rrd_value_t *pdp_temp,
196     rrd_value_t *last_seasonal_coef,
197     rrd_value_t *seasonal_coef,
198     int current_cf);
199
200 static void update_cdp(
201     unival *scratch,
202     int current_cf,
203     rrd_value_t pdp_temp_val,
204     unsigned long rra_step_cnt,
205     unsigned long elapsed_pdp_st,
206     unsigned long start_pdp_offset,
207     unsigned long pdp_cnt,
208     rrd_value_t xff,
209     int i,
210     int ii);
211
212 static void initialize_cdp_val(
213     unival *scratch,
214     int current_cf,
215     rrd_value_t pdp_temp_val,
216     unsigned long elapsed_pdp_st,
217     unsigned long start_pdp_offset,
218     unsigned long pdp_cnt);
219
220 static void reset_cdp(
221     rrd_t *rrd,
222     unsigned long elapsed_pdp_st,
223     rrd_value_t *pdp_temp,
224     rrd_value_t *last_seasonal_coef,
225     rrd_value_t *seasonal_coef,
226     int rra_idx,
227     int ds_idx,
228     int cdp_idx,
229     enum cf_en current_cf);
230
231 static rrd_value_t initialize_average_carry_over(
232     rrd_value_t pdp_temp_val,
233     unsigned long elapsed_pdp_st,
234     unsigned long start_pdp_offset,
235     unsigned long pdp_cnt);
236
237 static rrd_value_t calculate_cdp_val(
238     rrd_value_t cdp_val,
239     rrd_value_t pdp_temp_val,
240     unsigned long elapsed_pdp_st,
241     int current_cf,
242     int i,
243     int ii);
244
245 static int update_aberrant_cdps(
246     rrd_t *rrd,
247     rrd_file_t *rrd_file,
248     unsigned long rra_begin,
249     unsigned long elapsed_pdp_st,
250     rrd_value_t *pdp_temp,
251     rrd_value_t **seasonal_coef);
252
253 static int write_to_rras(
254     rrd_t *rrd,
255     rrd_file_t *rrd_file,
256     unsigned long *rra_step_cnt,
257     unsigned long rra_begin,
258     time_t current_time,
259     unsigned long *skip_update,
260     rrd_info_t ** pcdp_summary);
261
262 static int write_RRA_row(
263     rrd_file_t *rrd_file,
264     rrd_t *rrd,
265     unsigned long rra_idx,
266     unsigned short CDP_scratch_idx,
267     rrd_info_t ** pcdp_summary,
268     time_t rra_time);
269
270 static int smooth_all_rras(
271     rrd_t *rrd,
272     rrd_file_t *rrd_file,
273     unsigned long rra_begin);
274
275 #ifndef HAVE_MMAP
276 static int write_changes_to_disk(
277     rrd_t *rrd,
278     rrd_file_t *rrd_file,
279     int version);
280 #endif
281
282 /*
283  * normalize time as returned by gettimeofday. usec part must
284  * be always >= 0
285  */
286 static inline void normalize_time(
287     struct timeval *t)
288 {
289     if (t->tv_usec < 0) {
290         t->tv_sec--;
291         t->tv_usec += 1e6L;
292     }
293 }
294
295 /*
296  * Sets current_time and current_time_usec based on the current time.
297  * current_time_usec is set to 0 if the version number is 1 or 2.
298  */
299 static inline void initialize_time(
300     time_t *current_time,
301     unsigned long *current_time_usec,
302     int version)
303 {
304     struct timeval tmp_time;    /* used for time conversion */
305
306     gettimeofday(&tmp_time, 0);
307     normalize_time(&tmp_time);
308     *current_time = tmp_time.tv_sec;
309     if (version >= 3) {
310         *current_time_usec = tmp_time.tv_usec;
311     } else {
312         *current_time_usec = 0;
313     }
314 }
315
316 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317
318 rrd_info_t *rrd_update_v(
319     int argc,
320     char **argv)
321 {
322     char     *tmplt = NULL;
323     rrd_info_t *result = NULL;
324     rrd_infoval_t rc;
325     struct option long_options[] = {
326         {"template", required_argument, 0, 't'},
327         {0, 0, 0, 0}
328     };
329
330     rc.u_int = -1;
331     optind = 0;
332     opterr = 0;         /* initialize getopt */
333
334     while (1) {
335         int       option_index = 0;
336         int       opt;
337
338         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
339
340         if (opt == EOF)
341             break;
342
343         switch (opt) {
344         case 't':
345             tmplt = optarg;
346             break;
347
348         case '?':
349             rrd_set_error("unknown option '%s'", argv[optind - 1]);
350             goto end_tag;
351         }
352     }
353
354     /* need at least 2 arguments: filename, data. */
355     if (argc - optind < 2) {
356         rrd_set_error("Not enough arguments");
357         goto end_tag;
358     }
359     rc.u_int = 0;
360     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
361     rc.u_int = _rrd_update(argv[optind], tmplt,
362                            argc - optind - 1,
363                            (const char **) (argv + optind + 1), result);
364     result->value.u_int = rc.u_int;
365   end_tag:
366     return result;
367 }
368
369 int rrd_update(
370     int argc,
371     char **argv)
372 {
373     struct option long_options[] = {
374         {"template", required_argument, 0, 't'},
375         {"daemon",   required_argument, 0, 'd'},
376         {0, 0, 0, 0}
377     };
378     int       option_index = 0;
379     int       opt;
380     char     *tmplt = NULL;
381     int       rc = -1;
382     char     *opt_daemon = NULL;
383
384     optind = 0;
385     opterr = 0;         /* initialize getopt */
386
387     while (1) {
388         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
389
390         if (opt == EOF)
391             break;
392
393         switch (opt) {
394         case 't':
395             tmplt = strdup(optarg);
396             break;
397
398         case 'd':
399             if (opt_daemon != NULL)
400                 free (opt_daemon);
401             opt_daemon = strdup (optarg);
402             if (opt_daemon == NULL)
403             {
404                 rrd_set_error("strdup failed.");
405                 goto out;
406             }
407             break;
408
409         case '?':
410             rrd_set_error("unknown option '%s'", argv[optind - 1]);
411             goto out;
412         }
413     }
414
415     /* need at least 2 arguments: filename, data. */
416     if (argc - optind < 2) {
417         rrd_set_error("Not enough arguments");
418         goto out;
419     }
420
421     if ((tmplt != NULL) && (opt_daemon != NULL))
422     {
423         rrd_set_error("The caching opt_daemon cannot be used together with "
424                 "templates yet.");
425         goto out;
426     }
427
428     if ((tmplt == NULL) && (opt_daemon == NULL))
429     {
430         char *temp;
431
432         temp = getenv (ENV_RRDCACHED_ADDRESS);
433         if (temp != NULL)
434         {
435             opt_daemon = strdup (temp);
436             if (opt_daemon == NULL)
437             {
438                 rrd_set_error("strdup failed.");
439                 goto out;
440             }
441         }
442     }
443
444     if (opt_daemon != NULL)
445     {
446         int status;
447
448         status = rrdc_connect (opt_daemon);
449         if (status != 0)
450         {
451             rrd_set_error("Unable to connect to opt_daemon: %s",
452                     (status < 0)
453                     ? "Internal error"
454                     : rrd_strerror (status));
455             goto out;
456         }
457
458         status = rrdc_update (/* file = */ argv[optind],
459                 /* values_num = */ argc - optind - 1,
460                 /* values = */ (void *) (argv + optind + 1));
461         if (status != 0)
462         {
463             rrd_set_error("Failed sending the values to the opt_daemon: %s",
464                     (status < 0)
465                     ? "Internal error"
466                     : rrd_strerror (status));
467         }
468         else
469         {
470             rc = 0;
471         }
472
473         rrdc_disconnect ();
474         goto out;
475     } /* if (opt_daemon != NULL) */
476
477     rc = rrd_update_r(argv[optind], tmplt,
478                       argc - optind - 1, (const char **) (argv + optind + 1));
479   out:
480     if (tmplt != NULL)
481     {
482         free(tmplt);
483         tmplt = NULL;
484     }
485     if (opt_daemon != NULL)
486     {
487         free (opt_daemon);
488         opt_daemon = NULL;
489     }
490     return rc;
491 }
492
493 int rrd_update_r(
494     const char *filename,
495     const char *tmplt,
496     int argc,
497     const char **argv)
498 {
499     return _rrd_update(filename, tmplt, argc, argv, NULL);
500 }
501
502 int _rrd_update(
503     const char *filename,
504     const char *tmplt,
505     int argc,
506     const char **argv,
507     rrd_info_t * pcdp_summary)
508 {
509
510     int       arg_i = 2;
511
512     unsigned long rra_begin;    /* byte pointer to the rra
513                                  * area in the rrd file.  this
514                                  * pointer never changes value */
515     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
516                              * to the existing entry */
517     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
518                              * to the cdp values */
519
520     long     *tmpl_idx; /* index representing the settings
521                          * transported by the tmplt index */
522     unsigned long tmpl_cnt = 2; /* time and data */
523     rrd_t     rrd;
524     time_t    current_time = 0;
525     unsigned long current_time_usec = 0;    /* microseconds part of current time */
526     char    **updvals;
527     int       schedule_smooth = 0;
528
529     /* number of elapsed PDP steps since last update */
530     unsigned long *rra_step_cnt = NULL;
531
532     int       version;  /* rrd version */
533     rrd_file_t *rrd_file;
534     char     *arg_copy; /* for processing the argv */
535     unsigned long *skip_update; /* RRAs to advance but not write */
536
537     /* need at least 1 arguments: data. */
538     if (argc < 1) {
539         rrd_set_error("Not enough arguments");
540         goto err_out;
541     }
542
543     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
544         goto err_free;
545     }
546     /* We are now at the beginning of the rra's */
547     rra_begin = rrd_file->header_len;
548
549     version = atoi(rrd.stat_head->version);
550
551     initialize_time(&current_time, &current_time_usec, version);
552
553     /* get exclusive lock to whole file.
554      * lock gets removed when we close the file.
555      */
556     if (rrd_lock(rrd_file) != 0) {
557         rrd_set_error("could not lock RRD");
558         goto err_close;
559     }
560
561     if (allocate_data_structures(&rrd, &updvals,
562                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
563                                  &rra_step_cnt, &skip_update,
564                                  &pdp_new) == -1) {
565         goto err_close;
566     }
567
568     /* loop through the arguments. */
569     for (arg_i = 0; arg_i < argc; arg_i++) {
570         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
571             rrd_set_error("failed duplication argv entry");
572             break;
573         }
574         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
575                         &current_time, &current_time_usec, pdp_temp, pdp_new,
576                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
577                         &pcdp_summary, version, skip_update,
578                         &schedule_smooth) == -1) {
579             if (rrd_test_error()) { /* Should have error string always here */
580                 char     *save_error;
581
582                 /* Prepend file name to error message */
583                 if ((save_error = strdup(rrd_get_error())) != NULL) {
584                     rrd_set_error("%s: %s", filename, save_error);
585                     free(save_error);
586                 }
587             }
588             free(arg_copy);
589             break;
590         }
591         free(arg_copy);
592     }
593
594     free(rra_step_cnt);
595
596     /* if we got here and if there is an error and if the file has not been
597      * written to, then close things up and return. */
598     if (rrd_test_error()) {
599         goto err_free_structures;
600     }
601 #ifndef HAVE_MMAP
602     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
603         goto err_free_structures;
604     }
605 #endif
606
607     /* calling the smoothing code here guarantees at most one smoothing
608      * operation per rrd_update call. Unfortunately, it is possible with bulk
609      * updates, or a long-delayed update for smoothing to occur off-schedule.
610      * This really isn't critical except during the burn-in cycles. */
611     if (schedule_smooth) {
612         smooth_all_rras(&rrd, rrd_file, rra_begin);
613     }
614
615 /*    rrd_dontneed(rrd_file,&rrd); */
616     rrd_free(&rrd);
617     rrd_close(rrd_file);
618
619     free(pdp_new);
620     free(tmpl_idx);
621     free(pdp_temp);
622     free(skip_update);
623     free(updvals);
624     return 0;
625
626   err_free_structures:
627     free(pdp_new);
628     free(tmpl_idx);
629     free(pdp_temp);
630     free(skip_update);
631     free(updvals);
632   err_close:
633     rrd_close(rrd_file);
634   err_free:
635     rrd_free(&rrd);
636   err_out:
637     return -1;
638 }
639
640 /*
641  * get exclusive lock to whole file.
642  * lock gets removed when we close the file
643  *
644  * returns 0 on success
645  */
646 int rrd_lock(
647     rrd_file_t *file)
648 {
649     int       rcstat;
650
651     {
652 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
653         struct _stat st;
654
655         if (_fstat(file->fd, &st) == 0) {
656             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
657         } else {
658             rcstat = -1;
659         }
660 #else
661         struct flock lock;
662
663         lock.l_type = F_WRLCK;  /* exclusive write lock */
664         lock.l_len = 0; /* whole file */
665         lock.l_start = 0;   /* start of file */
666         lock.l_whence = SEEK_SET;   /* end of file */
667
668         rcstat = fcntl(file->fd, F_SETLK, &lock);
669 #endif
670     }
671
672     return (rcstat);
673 }
674
675 /*
676  * Allocate some important arrays used, and initialize the template.
677  *
678  * When it returns, either all of the structures are allocated
679  * or none of them are.
680  *
681  * Returns 0 on success, -1 on error.
682  */
683 static int allocate_data_structures(
684     rrd_t *rrd,
685     char ***updvals,
686     rrd_value_t **pdp_temp,
687     const char *tmplt,
688     long **tmpl_idx,
689     unsigned long *tmpl_cnt,
690     unsigned long **rra_step_cnt,
691     unsigned long **skip_update,
692     rrd_value_t **pdp_new)
693 {
694     unsigned  i, ii;
695     if ((*updvals = (char **) malloc(sizeof(char *)
696                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
697         rrd_set_error("allocating updvals pointer array.");
698         return -1;
699     }
700     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
701                                             * rrd->stat_head->ds_cnt)) ==
702         NULL) {
703         rrd_set_error("allocating pdp_temp.");
704         goto err_free_updvals;
705     }
706     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
707                                                  *
708                                                  rrd->stat_head->rra_cnt)) ==
709         NULL) {
710         rrd_set_error("allocating skip_update.");
711         goto err_free_pdp_temp;
712     }
713     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
714                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
715         rrd_set_error("allocating tmpl_idx.");
716         goto err_free_skip_update;
717     }
718     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
719                                                   *
720                                                   (rrd->stat_head->
721                                                    rra_cnt))) == NULL) {
722         rrd_set_error("allocating rra_step_cnt.");
723         goto err_free_tmpl_idx;
724     }
725
726     /* initialize tmplt redirector */
727     /* default config example (assume DS 1 is a CDEF DS)
728        tmpl_idx[0] -> 0; (time)
729        tmpl_idx[1] -> 1; (DS 0)
730        tmpl_idx[2] -> 3; (DS 2)
731        tmpl_idx[3] -> 4; (DS 3) */
732     (*tmpl_idx)[0] = 0; /* time */
733     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
734         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
735             (*tmpl_idx)[ii++] = i;
736     }
737     *tmpl_cnt = ii;
738
739     if (tmplt != NULL) {
740         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
741             goto err_free_rra_step_cnt;
742         }
743     }
744
745     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
746                                            * rrd->stat_head->ds_cnt)) == NULL) {
747         rrd_set_error("allocating pdp_new.");
748         goto err_free_rra_step_cnt;
749     }
750
751     return 0;
752
753   err_free_rra_step_cnt:
754     free(*rra_step_cnt);
755   err_free_tmpl_idx:
756     free(*tmpl_idx);
757   err_free_skip_update:
758     free(*skip_update);
759   err_free_pdp_temp:
760     free(*pdp_temp);
761   err_free_updvals:
762     free(*updvals);
763     return -1;
764 }
765
766 /*
767  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
768  *
769  * Returns 0 on success.
770  */
771 static int parse_template(
772     rrd_t *rrd,
773     const char *tmplt,
774     unsigned long *tmpl_cnt,
775     long *tmpl_idx)
776 {
777     char     *dsname, *tmplt_copy;
778     unsigned int tmpl_len, i;
779     int       ret = 0;
780
781     *tmpl_cnt = 1;      /* the first entry is the time */
782
783     /* we should work on a writeable copy here */
784     if ((tmplt_copy = strdup(tmplt)) == NULL) {
785         rrd_set_error("error copying tmplt '%s'", tmplt);
786         ret = -1;
787         goto out;
788     }
789
790     dsname = tmplt_copy;
791     tmpl_len = strlen(tmplt_copy);
792     for (i = 0; i <= tmpl_len; i++) {
793         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
794             tmplt_copy[i] = '\0';
795             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
796                 rrd_set_error("tmplt contains more DS definitions than RRD");
797                 ret = -1;
798                 goto out_free_tmpl_copy;
799             }
800             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
801                 rrd_set_error("unknown DS name '%s'", dsname);
802                 ret = -1;
803                 goto out_free_tmpl_copy;
804             }
805             /* go to the next entry on the tmplt_copy */
806             if (i < tmpl_len)
807                 dsname = &tmplt_copy[i + 1];
808         }
809     }
810   out_free_tmpl_copy:
811     free(tmplt_copy);
812   out:
813     return ret;
814 }
815
816 /*
817  * Parse an update string, updates the primary data points (PDPs)
818  * and consolidated data points (CDPs), and writes changes to the RRAs.
819  *
820  * Returns 0 on success, -1 on error.
821  */
822 static int process_arg(
823     char *step_start,
824     rrd_t *rrd,
825     rrd_file_t *rrd_file,
826     unsigned long rra_begin,
827     time_t *current_time,
828     unsigned long *current_time_usec,
829     rrd_value_t *pdp_temp,
830     rrd_value_t *pdp_new,
831     unsigned long *rra_step_cnt,
832     char **updvals,
833     long *tmpl_idx,
834     unsigned long tmpl_cnt,
835     rrd_info_t ** pcdp_summary,
836     int version,
837     unsigned long *skip_update,
838     int *schedule_smooth)
839 {
840     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
841
842     /* a vector of future Holt-Winters seasonal coefs */
843     unsigned long elapsed_pdp_st;
844
845     double    interval, pre_int, post_int;  /* interval between this and
846                                              * the last run */
847     unsigned long proc_pdp_cnt;
848
849     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
850                  current_time, current_time_usec, version) == -1) {
851         return -1;
852     }
853
854     interval = (double) (*current_time - rrd->live_head->last_up)
855         + (double) ((long) *current_time_usec -
856                     (long) rrd->live_head->last_up_usec) / 1e6f;
857
858     /* process the data sources and update the pdp_prep 
859      * area accordingly */
860     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
861         return -1;
862     }
863
864     elapsed_pdp_st = calculate_elapsed_steps(rrd,
865                                              *current_time,
866                                              *current_time_usec, interval,
867                                              &pre_int, &post_int,
868                                              &proc_pdp_cnt);
869
870     /* has a pdp_st moment occurred since the last run ? */
871     if (elapsed_pdp_st == 0) {
872         /* no we have not passed a pdp_st moment. therefore update is simple */
873         simple_update(rrd, interval, pdp_new);
874     } else {
875         /* an pdp_st has occurred. */
876         if (process_all_pdp_st(rrd, interval,
877                                pre_int, post_int,
878                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
879             return -1;
880         }
881         if (update_all_cdp_prep(rrd, rra_step_cnt,
882                                 rra_begin, rrd_file,
883                                 elapsed_pdp_st,
884                                 proc_pdp_cnt,
885                                 &last_seasonal_coef,
886                                 &seasonal_coef,
887                                 pdp_temp,
888                                 skip_update, schedule_smooth) == -1) {
889             goto err_free_coefficients;
890         }
891         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
892                                  elapsed_pdp_st, pdp_temp,
893                                  &seasonal_coef) == -1) {
894             goto err_free_coefficients;
895         }
896         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
897                           *current_time, skip_update,
898                           pcdp_summary) == -1) {
899             goto err_free_coefficients;
900         }
901     }                   /* endif a pdp_st has occurred */
902     rrd->live_head->last_up = *current_time;
903     rrd->live_head->last_up_usec = *current_time_usec;
904
905     if (version < 3) {
906         *rrd->legacy_last_up = rrd->live_head->last_up;
907     }
908     free(seasonal_coef);
909     free(last_seasonal_coef);
910     return 0;
911
912   err_free_coefficients:
913     free(seasonal_coef);
914     free(last_seasonal_coef);
915     return -1;
916 }
917
918 /*
919  * Parse a DS string (time + colon-separated values), storing the
920  * results in current_time, current_time_usec, and updvals.
921  *
922  * Returns 0 on success, -1 on error.
923  */
924 static int parse_ds(
925     rrd_t *rrd,
926     char **updvals,
927     long *tmpl_idx,
928     char *input,
929     unsigned long tmpl_cnt,
930     time_t *current_time,
931     unsigned long *current_time_usec,
932     int version)
933 {
934     char     *p;
935     unsigned long i;
936     char      timesyntax;
937
938     updvals[0] = input;
939     /* initialize all ds input to unknown except the first one
940        which has always got to be set */
941     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
942         updvals[i] = "U";
943
944     /* separate all ds elements; first must be examined separately
945        due to alternate time syntax */
946     if ((p = strchr(input, '@')) != NULL) {
947         timesyntax = '@';
948     } else if ((p = strchr(input, ':')) != NULL) {
949         timesyntax = ':';
950     } else {
951         rrd_set_error("expected timestamp not found in data source from %s",
952                       input);
953         return -1;
954     }
955     *p = '\0';
956     i = 1;
957     updvals[tmpl_idx[i++]] = p + 1;
958     while (*(++p)) {
959         if (*p == ':') {
960             *p = '\0';
961             if (i < tmpl_cnt) {
962                 updvals[tmpl_idx[i++]] = p + 1;
963             }
964         }
965     }
966
967     if (i != tmpl_cnt) {
968         rrd_set_error("expected %lu data source readings (got %lu) from %s",
969                       tmpl_cnt - 1, i, input);
970         return -1;
971     }
972
973     if (get_time_from_reading(rrd, timesyntax, updvals,
974                               current_time, current_time_usec,
975                               version) == -1) {
976         return -1;
977     }
978     return 0;
979 }
980
981 /*
982  * Parse the time in a DS string, store it in current_time and 
983  * current_time_usec and verify that it's later than the last
984  * update for this DS.
985  *
986  * Returns 0 on success, -1 on error.
987  */
988 static int get_time_from_reading(
989     rrd_t *rrd,
990     char timesyntax,
991     char **updvals,
992     time_t *current_time,
993     unsigned long *current_time_usec,
994     int version)
995 {
996     double    tmp;
997     char     *parsetime_error = NULL;
998     char     *old_locale;
999     rrd_time_value_t ds_tv;
1000     struct timeval tmp_time;    /* used for time conversion */
1001
1002     /* get the time from the reading ... handle N */
1003     if (timesyntax == '@') {    /* at-style */
1004         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
1005             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
1006             return -1;
1007         }
1008         if (ds_tv.type == RELATIVE_TO_END_TIME ||
1009             ds_tv.type == RELATIVE_TO_START_TIME) {
1010             rrd_set_error("specifying time relative to the 'start' "
1011                           "or 'end' makes no sense here: %s", updvals[0]);
1012             return -1;
1013         }
1014         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1015         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1016     } else if (strcmp(updvals[0], "N") == 0) {
1017         gettimeofday(&tmp_time, 0);
1018         normalize_time(&tmp_time);
1019         *current_time = tmp_time.tv_sec;
1020         *current_time_usec = tmp_time.tv_usec;
1021     } else {
1022         old_locale = setlocale(LC_NUMERIC, "C");
1023         tmp = strtod(updvals[0], 0);
1024         setlocale(LC_NUMERIC, old_locale);
1025         *current_time = floor(tmp);
1026         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1027     }
1028     /* dont do any correction for old version RRDs */
1029     if (version < 3)
1030         *current_time_usec = 0;
1031
1032     if (*current_time < rrd->live_head->last_up ||
1033         (*current_time == rrd->live_head->last_up &&
1034          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1035         rrd_set_error("illegal attempt to update using time %ld when "
1036                       "last update time is %ld (minimum one second step)",
1037                       *current_time, rrd->live_head->last_up);
1038         return -1;
1039     }
1040     return 0;
1041 }
1042
1043 /*
1044  * Update pdp_new by interpreting the updvals according to the DS type
1045  * (COUNTER, GAUGE, etc.).
1046  *
1047  * Returns 0 on success, -1 on error.
1048  */
1049 static int update_pdp_prep(
1050     rrd_t *rrd,
1051     char **updvals,
1052     rrd_value_t *pdp_new,
1053     double interval)
1054 {
1055     unsigned long ds_idx;
1056     int       ii;
1057     char     *endptr;   /* used in the conversion */
1058     double    rate;
1059     char     *old_locale;
1060     enum dst_en dst_idx;
1061
1062     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1063         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1064
1065         /* make sure we do not build diffs with old last_ds values */
1066         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1067             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1068             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1069         }
1070
1071         /* NOTE: DST_CDEF should never enter this if block, because
1072          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1073          * accidently specified a value for the DST_CDEF. To handle this case,
1074          * an extra check is required. */
1075
1076         if ((updvals[ds_idx + 1][0] != 'U') &&
1077             (dst_idx != DST_CDEF) &&
1078             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1079             rate = DNAN;
1080
1081             /* pdp_new contains rate * time ... eg the bytes transferred during
1082              * the interval. Doing it this way saves a lot of math operations
1083              */
1084             switch (dst_idx) {
1085             case DST_COUNTER:
1086             case DST_DERIVE:
1087                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1088                     if ((updvals[ds_idx + 1][ii] < '0'
1089                          || updvals[ds_idx + 1][ii] > '9')
1090                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1091                         rrd_set_error("not a simple integer: '%s'",
1092                                       updvals[ds_idx + 1]);
1093                         return -1;
1094                     }
1095                 }
1096                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1097                     pdp_new[ds_idx] =
1098                         rrd_diff(updvals[ds_idx + 1],
1099                                  rrd->pdp_prep[ds_idx].last_ds);
1100                     if (dst_idx == DST_COUNTER) {
1101                         /* simple overflow catcher. This will fail
1102                          * terribly for non 32 or 64 bit counters
1103                          * ... are there any others in SNMP land?
1104                          */
1105                         if (pdp_new[ds_idx] < (double) 0.0)
1106                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1107                         if (pdp_new[ds_idx] < (double) 0.0)
1108                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1109                     }
1110                     rate = pdp_new[ds_idx] / interval;
1111                 } else {
1112                     pdp_new[ds_idx] = DNAN;
1113                 }
1114                 break;
1115             case DST_ABSOLUTE:
1116                 old_locale = setlocale(LC_NUMERIC, "C");
1117                 errno = 0;
1118                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1119                 setlocale(LC_NUMERIC, old_locale);
1120                 if (errno > 0) {
1121                     rrd_set_error("converting '%s' to float: %s",
1122                                   updvals[ds_idx + 1], rrd_strerror(errno));
1123                     return -1;
1124                 };
1125                 if (endptr[0] != '\0') {
1126                     rrd_set_error
1127                         ("conversion of '%s' to float not complete: tail '%s'",
1128                          updvals[ds_idx + 1], endptr);
1129                     return -1;
1130                 }
1131                 rate = pdp_new[ds_idx] / interval;
1132                 break;
1133             case DST_GAUGE:
1134                 errno = 0;
1135                 old_locale = setlocale(LC_NUMERIC, "C");
1136                 pdp_new[ds_idx] =
1137                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1138                 setlocale(LC_NUMERIC, old_locale);
1139                 if (errno) {
1140                     rrd_set_error("converting '%s' to float: %s",
1141                                   updvals[ds_idx + 1], rrd_strerror(errno));
1142                     return -1;
1143                 };
1144                 if (endptr[0] != '\0') {
1145                     rrd_set_error
1146                         ("conversion of '%s' to float not complete: tail '%s'",
1147                          updvals[ds_idx + 1], endptr);
1148                     return -1;
1149                 }
1150                 rate = pdp_new[ds_idx] / interval;
1151                 break;
1152             default:
1153                 rrd_set_error("rrd contains unknown DS type : '%s'",
1154                               rrd->ds_def[ds_idx].dst);
1155                 return -1;
1156             }
1157             /* break out of this for loop if the error string is set */
1158             if (rrd_test_error()) {
1159                 return -1;
1160             }
1161             /* make sure pdp_temp is neither too large or too small
1162              * if any of these occur it becomes unknown ...
1163              * sorry folks ... */
1164             if (!isnan(rate) &&
1165                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1166                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1167                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1168                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1169                 pdp_new[ds_idx] = DNAN;
1170             }
1171         } else {
1172             /* no news is news all the same */
1173             pdp_new[ds_idx] = DNAN;
1174         }
1175
1176
1177         /* make a copy of the command line argument for the next run */
1178 #ifdef DEBUG
1179         fprintf(stderr, "prep ds[%lu]\t"
1180                 "last_arg '%s'\t"
1181                 "this_arg '%s'\t"
1182                 "pdp_new %10.2f\n",
1183                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1184                 pdp_new[ds_idx]);
1185 #endif
1186         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1187                 LAST_DS_LEN - 1);
1188         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1189     }
1190     return 0;
1191 }
1192
1193 /*
1194  * How many PDP steps have elapsed since the last update? Returns the answer,
1195  * and stores the time between the last update and the last PDP in pre_time,
1196  * and the time between the last PDP and the current time in post_int.
1197  */
1198 static int calculate_elapsed_steps(
1199     rrd_t *rrd,
1200     unsigned long current_time,
1201     unsigned long current_time_usec,
1202     double interval,
1203     double *pre_int,
1204     double *post_int,
1205     unsigned long *proc_pdp_cnt)
1206 {
1207     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1208     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1209                                  * time */
1210     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1211                                  * when it was last updated */
1212     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1213
1214     /* when was the current pdp started */
1215     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1216     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1217
1218     /* when did the last pdp_st occur */
1219     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1220     occu_pdp_st = current_time - occu_pdp_age;
1221
1222     if (occu_pdp_st > proc_pdp_st) {
1223         /* OK we passed the pdp_st moment */
1224         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1225                                                                      * occurred before the latest
1226                                                                      * pdp_st moment*/
1227         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1228         *post_int = occu_pdp_age;   /* how much after it */
1229         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1230     } else {
1231         *pre_int = interval;
1232         *post_int = 0;
1233     }
1234
1235     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1236
1237 #ifdef DEBUG
1238     printf("proc_pdp_age %lu\t"
1239            "proc_pdp_st %lu\t"
1240            "occu_pfp_age %lu\t"
1241            "occu_pdp_st %lu\t"
1242            "int %lf\t"
1243            "pre_int %lf\t"
1244            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1245            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1246 #endif
1247
1248     /* compute the number of elapsed pdp_st moments */
1249     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1250 }
1251
1252 /*
1253  * Increment the PDP values by the values in pdp_new, or else initialize them.
1254  */
1255 static void simple_update(
1256     rrd_t *rrd,
1257     double interval,
1258     rrd_value_t *pdp_new)
1259 {
1260     int       i;
1261
1262     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1263         if (isnan(pdp_new[i])) {
1264             /* this is not really accurate if we use subsecond data arrival time
1265                should have thought of it when going subsecond resolution ...
1266                sorry next format change we will have it! */
1267             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1268                 floor(interval);
1269         } else {
1270             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1271                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1272             } else {
1273                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1274             }
1275         }
1276 #ifdef DEBUG
1277         fprintf(stderr,
1278                 "NO PDP  ds[%i]\t"
1279                 "value %10.2f\t"
1280                 "unkn_sec %5lu\n",
1281                 i,
1282                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1283                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1284 #endif
1285     }
1286 }
1287
1288 /*
1289  * Call process_pdp_st for each DS.
1290  *
1291  * Returns 0 on success, -1 on error.
1292  */
1293 static int process_all_pdp_st(
1294     rrd_t *rrd,
1295     double interval,
1296     double pre_int,
1297     double post_int,
1298     unsigned long elapsed_pdp_st,
1299     rrd_value_t *pdp_new,
1300     rrd_value_t *pdp_temp)
1301 {
1302     unsigned long ds_idx;
1303
1304     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1305        rate*seconds which occurred up to the last run.
1306        pdp_new[] contains rate*seconds from the latest run.
1307        pdp_temp[] will contain the rate for cdp */
1308
1309     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1310         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1311                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1312                            pdp_new, pdp_temp) == -1) {
1313             return -1;
1314         }
1315 #ifdef DEBUG
1316         fprintf(stderr, "PDP UPD ds[%lu]\t"
1317                 "elapsed_pdp_st %lu\t"
1318                 "pdp_temp %10.2f\t"
1319                 "new_prep %10.2f\t"
1320                 "new_unkn_sec %5lu\n",
1321                 ds_idx,
1322                 elapsed_pdp_st,
1323                 pdp_temp[ds_idx],
1324                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1325                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1326 #endif
1327     }
1328     return 0;
1329 }
1330
1331 /*
1332  * Process an update that occurs after one of the PDP moments.
1333  * Increments the PDP value, sets NAN if time greater than the
1334  * heartbeats have elapsed, processes CDEFs.
1335  *
1336  * Returns 0 on success, -1 on error.
1337  */
1338 static int process_pdp_st(
1339     rrd_t *rrd,
1340     unsigned long ds_idx,
1341     double interval,
1342     double pre_int,
1343     double post_int,
1344     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1345     rrd_value_t *pdp_new,
1346     rrd_value_t *pdp_temp)
1347 {
1348     int       i;
1349
1350     /* update pdp_prep to the current pdp_st. */
1351     double    pre_unknown = 0.0;
1352     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1353     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1354
1355     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1356
1357     rpnstack_init(&rpnstack);
1358
1359
1360     if (isnan(pdp_new[ds_idx])) {
1361         /* a final bit of unknown to be added before calculation
1362            we use a temporary variable for this so that we
1363            don't have to turn integer lines before using the value */
1364         pre_unknown = pre_int;
1365     } else {
1366         if (isnan(scratch[PDP_val].u_val)) {
1367             scratch[PDP_val].u_val = 0;
1368         }
1369         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1370     }
1371
1372     /* if too much of the pdp_prep is unknown we dump it */
1373     /* if the interval is larger thatn mrhb we get NAN */
1374     if ((interval > mrhb) ||
1375         (rrd->stat_head->pdp_step / 2.0 <
1376          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1377         pdp_temp[ds_idx] = DNAN;
1378     } else {
1379         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1380             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1381              pre_unknown);
1382     }
1383
1384     /* process CDEF data sources; remember each CDEF DS can
1385      * only reference other DS with a lower index number */
1386     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1387         rpnp_t   *rpnp;
1388
1389         rpnp =
1390             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1391         /* substitute data values for OP_VARIABLE nodes */
1392         for (i = 0; rpnp[i].op != OP_END; i++) {
1393             if (rpnp[i].op == OP_VARIABLE) {
1394                 rpnp[i].op = OP_NUMBER;
1395                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1396             }
1397         }
1398         /* run the rpn calculator */
1399         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1400             free(rpnp);
1401             rpnstack_free(&rpnstack);
1402             return -1;
1403         }
1404     }
1405
1406     /* make pdp_prep ready for the next run */
1407     if (isnan(pdp_new[ds_idx])) {
1408         /* this is not realy accurate if we use subsecond data arival time
1409            should have thought of it when going subsecond resolution ...
1410            sorry next format change we will have it! */
1411         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1412         scratch[PDP_val].u_val = DNAN;
1413     } else {
1414         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1415         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1416     }
1417     rpnstack_free(&rpnstack);
1418     return 0;
1419 }
1420
1421 /*
1422  * Iterate over all the RRAs for a given DS and:
1423  * 1. Decide whether to schedule a smooth later
1424  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1425  * 3. Update the CDP
1426  *
1427  * Returns 0 on success, -1 on error
1428  */
1429 static int update_all_cdp_prep(
1430     rrd_t *rrd,
1431     unsigned long *rra_step_cnt,
1432     unsigned long rra_begin,
1433     rrd_file_t *rrd_file,
1434     unsigned long elapsed_pdp_st,
1435     unsigned long proc_pdp_cnt,
1436     rrd_value_t **last_seasonal_coef,
1437     rrd_value_t **seasonal_coef,
1438     rrd_value_t *pdp_temp,
1439     unsigned long *skip_update,
1440     int *schedule_smooth)
1441 {
1442     unsigned long rra_idx;
1443
1444     /* index into the CDP scratch array */
1445     enum cf_en current_cf;
1446     unsigned long rra_start;
1447
1448     /* number of rows to be updated in an RRA for a data value. */
1449     unsigned long start_pdp_offset;
1450
1451     rra_start = rra_begin;
1452     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1453         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1454         start_pdp_offset =
1455             rrd->rra_def[rra_idx].pdp_cnt -
1456             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1457         skip_update[rra_idx] = 0;
1458         if (start_pdp_offset <= elapsed_pdp_st) {
1459             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1460                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1461         } else {
1462             rra_step_cnt[rra_idx] = 0;
1463         }
1464
1465         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1466             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1467              * so that they will be correct for the next observed value; note that for
1468              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1469              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1470             if (rra_step_cnt[rra_idx] > 1) {
1471                 skip_update[rra_idx] = 1;
1472                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1473                                 elapsed_pdp_st, last_seasonal_coef);
1474                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1475                                 elapsed_pdp_st + 1, seasonal_coef);
1476             }
1477             /* periodically run a smoother for seasonal effects */
1478             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1479 #ifdef DEBUG
1480                 fprintf(stderr,
1481                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1482                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1483                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1484                         u_cnt);
1485 #endif
1486                 *schedule_smooth = 1;
1487             }
1488         }
1489         if (rrd_test_error())
1490             return -1;
1491
1492         if (update_cdp_prep
1493             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1494              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1495              current_cf) == -1) {
1496             return -1;
1497         }
1498         rra_start +=
1499             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1500             sizeof(rrd_value_t);
1501     }
1502     return 0;
1503 }
1504
1505 /* 
1506  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1507  */
1508 static int do_schedule_smooth(
1509     rrd_t *rrd,
1510     unsigned long rra_idx,
1511     unsigned long elapsed_pdp_st)
1512 {
1513     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1514     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1515     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1516     unsigned long seasonal_smooth_idx =
1517         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1518     unsigned long *init_seasonal =
1519         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1520
1521     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1522      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1523      * really an RRA level, not a data source within RRA level parameter, but
1524      * the rra_def is read only for rrd_update (not flushed to disk). */
1525     if (*init_seasonal > BURNIN_CYCLES) {
1526         /* someone has no doubt invented a trick to deal with this wrap around,
1527          * but at least this code is clear. */
1528         if (seasonal_smooth_idx > cur_row) {
1529             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1530              * between PDP and CDP */
1531             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1532         }
1533         /* can't rely on negative numbers because we are working with
1534          * unsigned values */
1535         return (cur_row + elapsed_pdp_st >= row_cnt
1536                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1537     }
1538     /* mark off one of the burn-in cycles */
1539     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1540 }
1541
1542 /*
1543  * For a given RRA, iterate over the data sources and call the appropriate
1544  * consolidation function.
1545  *
1546  * Returns 0 on success, -1 on error.
1547  */
1548 static int update_cdp_prep(
1549     rrd_t *rrd,
1550     unsigned long elapsed_pdp_st,
1551     unsigned long start_pdp_offset,
1552     unsigned long *rra_step_cnt,
1553     int rra_idx,
1554     rrd_value_t *pdp_temp,
1555     rrd_value_t *last_seasonal_coef,
1556     rrd_value_t *seasonal_coef,
1557     int current_cf)
1558 {
1559     unsigned long ds_idx, cdp_idx;
1560
1561     /* update CDP_PREP areas */
1562     /* loop over data soures within each RRA */
1563     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1564
1565         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1566
1567         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1568             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1569                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1570                        elapsed_pdp_st, start_pdp_offset,
1571                        rrd->rra_def[rra_idx].pdp_cnt,
1572                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1573                        rra_idx, ds_idx);
1574         } else {
1575             /* Nothing to consolidate if there's one PDP per CDP. However, if
1576              * we've missed some PDPs, let's update null counters etc. */
1577             if (elapsed_pdp_st > 2) {
1578                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1579                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1580                           current_cf);
1581             }
1582         }
1583
1584         if (rrd_test_error())
1585             return -1;
1586     }                   /* endif data sources loop */
1587     return 0;
1588 }
1589
1590 /*
1591  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1592  * primary value, secondary value, and # of unknowns.
1593  */
1594 static void update_cdp(
1595     unival *scratch,
1596     int current_cf,
1597     rrd_value_t pdp_temp_val,
1598     unsigned long rra_step_cnt,
1599     unsigned long elapsed_pdp_st,
1600     unsigned long start_pdp_offset,
1601     unsigned long pdp_cnt,
1602     rrd_value_t xff,
1603     int i,
1604     int ii)
1605 {
1606     /* shorthand variables */
1607     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1608     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1609     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1610     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1611
1612     if (rra_step_cnt) {
1613         /* If we are in this block, as least 1 CDP value will be written to
1614          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1615          * to be written, then the "fill in" value is the CDP_secondary_val
1616          * entry. */
1617         if (isnan(pdp_temp_val)) {
1618             *cdp_unkn_pdp_cnt += start_pdp_offset;
1619             *cdp_secondary_val = DNAN;
1620         } else {
1621             /* CDP_secondary value is the RRA "fill in" value for intermediary
1622              * CDP data entries. No matter the CF, the value is the same because
1623              * the average, max, min, and last of a list of identical values is
1624              * the same, namely, the value itself. */
1625             *cdp_secondary_val = pdp_temp_val;
1626         }
1627
1628         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1629             *cdp_primary_val = DNAN;
1630             if (current_cf == CF_AVERAGE) {
1631                 *cdp_val =
1632                     initialize_average_carry_over(pdp_temp_val,
1633                                                   elapsed_pdp_st,
1634                                                   start_pdp_offset, pdp_cnt);
1635             } else {
1636                 *cdp_val = pdp_temp_val;
1637             }
1638         } else {
1639             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1640                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1641         }               /* endif meets xff value requirement for a valid value */
1642         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1643          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1644         if (isnan(pdp_temp_val))
1645             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1646         else
1647             *cdp_unkn_pdp_cnt = 0;
1648     } else {            /* rra_step_cnt[i]  == 0 */
1649
1650 #ifdef DEBUG
1651         if (isnan(*cdp_val)) {
1652             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1653                     i, ii);
1654         } else {
1655             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1656                     i, ii, *cdp_val);
1657         }
1658 #endif
1659         if (isnan(pdp_temp_val)) {
1660             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1661         } else {
1662             *cdp_val =
1663                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1664                                   current_cf, i, ii);
1665         }
1666     }
1667 }
1668
1669 /*
1670  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1671  * on the type of consolidation function.
1672  */
1673 static void initialize_cdp_val(
1674     unival *scratch,
1675     int current_cf,
1676     rrd_value_t pdp_temp_val,
1677     unsigned long elapsed_pdp_st,
1678     unsigned long start_pdp_offset,
1679     unsigned long pdp_cnt)
1680 {
1681     rrd_value_t cum_val, cur_val;
1682
1683     switch (current_cf) {
1684     case CF_AVERAGE:
1685         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1686         cur_val = IFDNAN(pdp_temp_val, 0.0);
1687         scratch[CDP_primary_val].u_val =
1688             (cum_val + cur_val * start_pdp_offset) /
1689             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1690         scratch[CDP_val].u_val =
1691             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1692                                           start_pdp_offset, pdp_cnt);
1693         break;
1694     case CF_MAXIMUM:
1695         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1696         cur_val = IFDNAN(pdp_temp_val, -DINF);
1697 #if 0
1698 #ifdef DEBUG
1699         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1700             fprintf(stderr,
1701                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1702                     i, ii);
1703             exit(-1);
1704         }
1705 #endif
1706 #endif
1707         if (cur_val > cum_val)
1708             scratch[CDP_primary_val].u_val = cur_val;
1709         else
1710             scratch[CDP_primary_val].u_val = cum_val;
1711         /* initialize carry over value */
1712         scratch[CDP_val].u_val = pdp_temp_val;
1713         break;
1714     case CF_MINIMUM:
1715         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1716         cur_val = IFDNAN(pdp_temp_val, DINF);
1717 #if 0
1718 #ifdef DEBUG
1719         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1720             fprintf(stderr,
1721                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1722                     ii);
1723             exit(-1);
1724         }
1725 #endif
1726 #endif
1727         if (cur_val < cum_val)
1728             scratch[CDP_primary_val].u_val = cur_val;
1729         else
1730             scratch[CDP_primary_val].u_val = cum_val;
1731         /* initialize carry over value */
1732         scratch[CDP_val].u_val = pdp_temp_val;
1733         break;
1734     case CF_LAST:
1735     default:
1736         scratch[CDP_primary_val].u_val = pdp_temp_val;
1737         /* initialize carry over value */
1738         scratch[CDP_val].u_val = pdp_temp_val;
1739         break;
1740     }
1741 }
1742
1743 /*
1744  * Update the consolidation function for Holt-Winters functions as
1745  * well as other functions that don't actually consolidate multiple
1746  * PDPs.
1747  */
1748 static void reset_cdp(
1749     rrd_t *rrd,
1750     unsigned long elapsed_pdp_st,
1751     rrd_value_t *pdp_temp,
1752     rrd_value_t *last_seasonal_coef,
1753     rrd_value_t *seasonal_coef,
1754     int rra_idx,
1755     int ds_idx,
1756     int cdp_idx,
1757     enum cf_en current_cf)
1758 {
1759     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1760
1761     switch (current_cf) {
1762     case CF_AVERAGE:
1763     default:
1764         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1765         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1766         break;
1767     case CF_SEASONAL:
1768     case CF_DEVSEASONAL:
1769         /* need to update cached seasonal values, so they are consistent
1770          * with the bulk update */
1771         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1772          * CDP_last_deviation are the same. */
1773         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1774         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1775         break;
1776     case CF_HWPREDICT:
1777     case CF_MHWPREDICT:
1778         /* need to update the null_count and last_null_count.
1779          * even do this for non-DNAN pdp_temp because the
1780          * algorithm is not learning from batch updates. */
1781         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1782         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1783         /* fall through */
1784     case CF_DEVPREDICT:
1785         scratch[CDP_primary_val].u_val = DNAN;
1786         scratch[CDP_secondary_val].u_val = DNAN;
1787         break;
1788     case CF_FAILURES:
1789         /* do not count missed bulk values as failures */
1790         scratch[CDP_primary_val].u_val = 0;
1791         scratch[CDP_secondary_val].u_val = 0;
1792         /* need to reset violations buffer.
1793          * could do this more carefully, but for now, just
1794          * assume a bulk update wipes away all violations. */
1795         erase_violations(rrd, cdp_idx, rra_idx);
1796         break;
1797     }
1798 }
1799
1800 static rrd_value_t initialize_average_carry_over(
1801     rrd_value_t pdp_temp_val,
1802     unsigned long elapsed_pdp_st,
1803     unsigned long start_pdp_offset,
1804     unsigned long pdp_cnt)
1805 {
1806     /* initialize carry over value */
1807     if (isnan(pdp_temp_val)) {
1808         return DNAN;
1809     }
1810     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1811 }
1812
1813 /*
1814  * Update or initialize a CDP value based on the consolidation
1815  * function.
1816  *
1817  * Returns the new value.
1818  */
1819 static rrd_value_t calculate_cdp_val(
1820     rrd_value_t cdp_val,
1821     rrd_value_t pdp_temp_val,
1822     unsigned long elapsed_pdp_st,
1823     int current_cf,
1824 #ifdef DEBUG
1825     int i,
1826     int ii
1827 #else
1828     int UNUSED(i),
1829     int UNUSED(ii)
1830 #endif
1831     )
1832 {
1833     if (isnan(cdp_val)) {
1834         if (current_cf == CF_AVERAGE) {
1835             pdp_temp_val *= elapsed_pdp_st;
1836         }
1837 #ifdef DEBUG
1838         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1839                 i, ii, pdp_temp_val);
1840 #endif
1841         return pdp_temp_val;
1842     }
1843     if (current_cf == CF_AVERAGE)
1844         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1845     if (current_cf == CF_MINIMUM)
1846         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1847     if (current_cf == CF_MAXIMUM)
1848         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1849
1850     return pdp_temp_val;
1851 }
1852
1853 /*
1854  * For each RRA, update the seasonal values and then call update_aberrant_CF
1855  * for each data source.
1856  *
1857  * Return 0 on success, -1 on error.
1858  */
1859 static int update_aberrant_cdps(
1860     rrd_t *rrd,
1861     rrd_file_t *rrd_file,
1862     unsigned long rra_begin,
1863     unsigned long elapsed_pdp_st,
1864     rrd_value_t *pdp_temp,
1865     rrd_value_t **seasonal_coef)
1866 {
1867     unsigned long rra_idx, ds_idx, j;
1868
1869     /* number of PDP steps since the last update that
1870      * are assigned to the first CDP to be generated
1871      * since the last update. */
1872     unsigned short scratch_idx;
1873     unsigned long rra_start;
1874     enum cf_en current_cf;
1875
1876     /* this loop is only entered if elapsed_pdp_st < 3 */
1877     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1878          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1879         rra_start = rra_begin;
1880         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1881             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1882                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1883                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1884                     if (scratch_idx == CDP_primary_val) {
1885                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1886                                         elapsed_pdp_st + 1, seasonal_coef);
1887                     } else {
1888                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1889                                         elapsed_pdp_st + 2, seasonal_coef);
1890                     }
1891                 }
1892                 if (rrd_test_error())
1893                     return -1;
1894                 /* loop over data soures within each RRA */
1895                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1896                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1897                                        rra_idx * (rrd->stat_head->ds_cnt) +
1898                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1899                                        *seasonal_coef);
1900                 }
1901             }
1902             rra_start += rrd->rra_def[rra_idx].row_cnt
1903                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1904         }
1905     }
1906     return 0;
1907 }
1908
1909 /* 
1910  * Move sequentially through the file, writing one RRA at a time.  Note this
1911  * architecture divorces the computation of CDP with flushing updated RRA
1912  * entries to disk.
1913  *
1914  * Return 0 on success, -1 on error.
1915  */
1916 static int write_to_rras(
1917     rrd_t *rrd,
1918     rrd_file_t *rrd_file,
1919     unsigned long *rra_step_cnt,
1920     unsigned long rra_begin,
1921     time_t current_time,
1922     unsigned long *skip_update,
1923     rrd_info_t ** pcdp_summary)
1924 {
1925     unsigned long rra_idx;
1926     unsigned long rra_start;
1927     time_t    rra_time = 0; /* time of update for a RRA */
1928
1929     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1930     
1931     /* Ready to write to disk */
1932     rra_start = rra_begin;
1933
1934     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1935         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1936         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1937
1938         /* for cdp_prep */
1939         unsigned short scratch_idx;
1940         unsigned long step_subtract;
1941
1942         for (scratch_idx = CDP_primary_val,
1943                  step_subtract = 1;
1944              rra_step_cnt[rra_idx] > 0;
1945              rra_step_cnt[rra_idx]--,
1946                  scratch_idx = CDP_secondary_val,
1947                  step_subtract = 2) {
1948
1949             off_t rra_pos_new;
1950 #ifdef DEBUG
1951             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1952 #endif
1953             /* increment, with wrap-around */
1954             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1955               rra_ptr->cur_row = 0;
1956
1957             /* we know what our position should be */
1958             rra_pos_new = rra_start
1959               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1960
1961             /* re-seek if the position is wrong or we wrapped around */
1962             if (rra_pos_new != rrd_file->pos) {
1963                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1964                     rrd_set_error("seek error in rrd");
1965                     return -1;
1966                 }
1967             }
1968 #ifdef DEBUG
1969             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1970 #endif
1971
1972             if (skip_update[rra_idx])
1973                 continue;
1974
1975             if (*pcdp_summary != NULL) {
1976                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1977
1978                 rra_time = (current_time - current_time % step_time)
1979                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1980             }
1981
1982             if (write_RRA_row
1983                 (rrd_file, rrd, rra_idx, scratch_idx,
1984                  pcdp_summary, rra_time) == -1)
1985                 return -1;
1986         }
1987
1988         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1989     } /* RRA LOOP */
1990
1991     return 0;
1992 }
1993
1994 /*
1995  * Write out one row of values (one value per DS) to the archive.
1996  *
1997  * Returns 0 on success, -1 on error.
1998  */
1999 static int write_RRA_row(
2000     rrd_file_t *rrd_file,
2001     rrd_t *rrd,
2002     unsigned long rra_idx,
2003     unsigned short CDP_scratch_idx,
2004     rrd_info_t ** pcdp_summary,
2005     time_t rra_time)
2006 {
2007     unsigned long ds_idx, cdp_idx;
2008     rrd_infoval_t iv;
2009
2010     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2011         /* compute the cdp index */
2012         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2013 #ifdef DEBUG
2014         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2015                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2016                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2017 #endif
2018         if (*pcdp_summary != NULL) {
2019             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2020             /* append info to the return hash */
2021             *pcdp_summary = rrd_info_push(*pcdp_summary,
2022                                           sprintf_alloc
2023                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2024                                            rrd->rra_def[rra_idx].cf_nam,
2025                                            rrd->rra_def[rra_idx].pdp_cnt,
2026                                            rrd->ds_def[ds_idx].ds_nam),
2027                                           RD_I_VAL, iv);
2028         }
2029         if (rrd_write(rrd_file,
2030                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2031                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2032             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2033             return -1;
2034         }
2035     }
2036     return 0;
2037 }
2038
2039 /*
2040  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2041  *
2042  * Returns 0 on success, -1 otherwise
2043  */
2044 static int smooth_all_rras(
2045     rrd_t *rrd,
2046     rrd_file_t *rrd_file,
2047     unsigned long rra_begin)
2048 {
2049     unsigned long rra_start = rra_begin;
2050     unsigned long rra_idx;
2051
2052     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2053         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2054             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2055 #ifdef DEBUG
2056             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2057 #endif
2058             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2059             if (rrd_test_error())
2060                 return -1;
2061         }
2062         rra_start += rrd->rra_def[rra_idx].row_cnt
2063             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2064     }
2065     return 0;
2066 }
2067
2068 #ifndef HAVE_MMAP
2069 /*
2070  * Flush changes to disk (unless we're using mmap)
2071  *
2072  * Returns 0 on success, -1 otherwise
2073  */
2074 static int write_changes_to_disk(
2075     rrd_t *rrd,
2076     rrd_file_t *rrd_file,
2077     int version)
2078 {
2079     /* we just need to write back the live header portion now */
2080     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2081                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2082                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2083                  SEEK_SET) != 0) {
2084         rrd_set_error("seek rrd for live header writeback");
2085         return -1;
2086     }
2087     if (version >= 3) {
2088         if (rrd_write(rrd_file, rrd->live_head,
2089                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2090             rrd_set_error("rrd_write live_head to rrd");
2091             return -1;
2092         }
2093     } else {
2094         if (rrd_write(rrd_file, rrd->legacy_last_up,
2095                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2096             rrd_set_error("rrd_write live_head to rrd");
2097             return -1;
2098         }
2099     }
2100
2101
2102     if (rrd_write(rrd_file, rrd->pdp_prep,
2103                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2104         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2105         rrd_set_error("rrd_write pdp_prep to rrd");
2106         return -1;
2107     }
2108
2109     if (rrd_write(rrd_file, rrd->cdp_prep,
2110                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2111                   rrd->stat_head->ds_cnt)
2112         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2113                       rrd->stat_head->ds_cnt)) {
2114
2115         rrd_set_error("rrd_write cdp_prep to rrd");
2116         return -1;
2117     }
2118
2119     if (rrd_write(rrd_file, rrd->rra_ptr,
2120                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2121         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2122         rrd_set_error("rrd_write rra_ptr to rrd");
2123         return -1;
2124     }
2125     return 0;
2126 }
2127 #endif