Renaming ntmake.pl to ntmake.PL (r1742) had unforseen side effects. At least
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
4  *                Copyright by Florian Forster, 2008
5  *****************************************************************************
6  * rrd_update.c  RRD Update Function
7  *****************************************************************************
8  * $Id$
9  *****************************************************************************/
10
11 #include "rrd_tool.h"
12
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
18
19 #include <locale.h>
20
21 #include "rrd_hw.h"
22 #include "rrd_rpncalc.h"
23
24 #include "rrd_is_thread_safe.h"
25 #include "unused.h"
26
27 #include "rrd_client.h"
28
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 /*
31  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
32  * replacement.
33  */
34 #include <sys/timeb.h>
35
36 #ifndef __MINGW32__
37 struct timeval {
38     time_t    tv_sec;   /* seconds */
39     long      tv_usec;  /* microseconds */
40 };
41 #endif
42
43 struct __timezone {
44     int       tz_minuteswest;   /* minutes W of Greenwich */
45     int       tz_dsttime;   /* type of dst correction */
46 };
47
48 static int gettimeofday(
49     struct timeval *t,
50     struct __timezone *tz)
51 {
52
53     struct _timeb current_time;
54
55     _ftime(&current_time);
56
57     t->tv_sec = current_time.time;
58     t->tv_usec = current_time.millitm * 1000;
59
60     return 0;
61 }
62
63 #endif
64
65 /* FUNCTION PROTOTYPES */
66
67 int       rrd_update_r(
68     const char *filename,
69     const char *tmplt,
70     int argc,
71     const char **argv);
72 int       _rrd_update(
73     const char *filename,
74     const char *tmplt,
75     int argc,
76     const char **argv,
77     rrd_info_t *);
78
79 static int allocate_data_structures(
80     rrd_t *rrd,
81     char ***updvals,
82     rrd_value_t **pdp_temp,
83     const char *tmplt,
84     long **tmpl_idx,
85     unsigned long *tmpl_cnt,
86     unsigned long **rra_step_cnt,
87     unsigned long **skip_update,
88     rrd_value_t **pdp_new);
89
90 static int parse_template(
91     rrd_t *rrd,
92     const char *tmplt,
93     unsigned long *tmpl_cnt,
94     long *tmpl_idx);
95
96 static int process_arg(
97     char *step_start,
98     rrd_t *rrd,
99     rrd_file_t *rrd_file,
100     unsigned long rra_begin,
101     time_t *current_time,
102     unsigned long *current_time_usec,
103     rrd_value_t *pdp_temp,
104     rrd_value_t *pdp_new,
105     unsigned long *rra_step_cnt,
106     char **updvals,
107     long *tmpl_idx,
108     unsigned long tmpl_cnt,
109     rrd_info_t ** pcdp_summary,
110     int version,
111     unsigned long *skip_update,
112     int *schedule_smooth);
113
114 static int parse_ds(
115     rrd_t *rrd,
116     char **updvals,
117     long *tmpl_idx,
118     char *input,
119     unsigned long tmpl_cnt,
120     time_t *current_time,
121     unsigned long *current_time_usec,
122     int version);
123
124 static int get_time_from_reading(
125     rrd_t *rrd,
126     char timesyntax,
127     char **updvals,
128     time_t *current_time,
129     unsigned long *current_time_usec,
130     int version);
131
132 static int update_pdp_prep(
133     rrd_t *rrd,
134     char **updvals,
135     rrd_value_t *pdp_new,
136     double interval);
137
138 static int calculate_elapsed_steps(
139     rrd_t *rrd,
140     unsigned long current_time,
141     unsigned long current_time_usec,
142     double interval,
143     double *pre_int,
144     double *post_int,
145     unsigned long *proc_pdp_cnt);
146
147 static void simple_update(
148     rrd_t *rrd,
149     double interval,
150     rrd_value_t *pdp_new);
151
152 static int process_all_pdp_st(
153     rrd_t *rrd,
154     double interval,
155     double pre_int,
156     double post_int,
157     unsigned long elapsed_pdp_st,
158     rrd_value_t *pdp_new,
159     rrd_value_t *pdp_temp);
160
161 static int process_pdp_st(
162     rrd_t *rrd,
163     unsigned long ds_idx,
164     double interval,
165     double pre_int,
166     double post_int,
167     long diff_pdp_st,
168     rrd_value_t *pdp_new,
169     rrd_value_t *pdp_temp);
170
171 static int update_all_cdp_prep(
172     rrd_t *rrd,
173     unsigned long *rra_step_cnt,
174     unsigned long rra_begin,
175     rrd_file_t *rrd_file,
176     unsigned long elapsed_pdp_st,
177     unsigned long proc_pdp_cnt,
178     rrd_value_t **last_seasonal_coef,
179     rrd_value_t **seasonal_coef,
180     rrd_value_t *pdp_temp,
181     unsigned long *skip_update,
182     int *schedule_smooth);
183
184 static int do_schedule_smooth(
185     rrd_t *rrd,
186     unsigned long rra_idx,
187     unsigned long elapsed_pdp_st);
188
189 static int update_cdp_prep(
190     rrd_t *rrd,
191     unsigned long elapsed_pdp_st,
192     unsigned long start_pdp_offset,
193     unsigned long *rra_step_cnt,
194     int rra_idx,
195     rrd_value_t *pdp_temp,
196     rrd_value_t *last_seasonal_coef,
197     rrd_value_t *seasonal_coef,
198     int current_cf);
199
200 static void update_cdp(
201     unival *scratch,
202     int current_cf,
203     rrd_value_t pdp_temp_val,
204     unsigned long rra_step_cnt,
205     unsigned long elapsed_pdp_st,
206     unsigned long start_pdp_offset,
207     unsigned long pdp_cnt,
208     rrd_value_t xff,
209     int i,
210     int ii);
211
212 static void initialize_cdp_val(
213     unival *scratch,
214     int current_cf,
215     rrd_value_t pdp_temp_val,
216     unsigned long elapsed_pdp_st,
217     unsigned long start_pdp_offset,
218     unsigned long pdp_cnt);
219
220 static void reset_cdp(
221     rrd_t *rrd,
222     unsigned long elapsed_pdp_st,
223     rrd_value_t *pdp_temp,
224     rrd_value_t *last_seasonal_coef,
225     rrd_value_t *seasonal_coef,
226     int rra_idx,
227     int ds_idx,
228     int cdp_idx,
229     enum cf_en current_cf);
230
231 static rrd_value_t initialize_average_carry_over(
232     rrd_value_t pdp_temp_val,
233     unsigned long elapsed_pdp_st,
234     unsigned long start_pdp_offset,
235     unsigned long pdp_cnt);
236
237 static rrd_value_t calculate_cdp_val(
238     rrd_value_t cdp_val,
239     rrd_value_t pdp_temp_val,
240     unsigned long elapsed_pdp_st,
241     int current_cf,
242     int i,
243     int ii);
244
245 static int update_aberrant_cdps(
246     rrd_t *rrd,
247     rrd_file_t *rrd_file,
248     unsigned long rra_begin,
249     unsigned long elapsed_pdp_st,
250     rrd_value_t *pdp_temp,
251     rrd_value_t **seasonal_coef);
252
253 static int write_to_rras(
254     rrd_t *rrd,
255     rrd_file_t *rrd_file,
256     unsigned long *rra_step_cnt,
257     unsigned long rra_begin,
258     time_t current_time,
259     unsigned long *skip_update,
260     rrd_info_t ** pcdp_summary);
261
262 static int write_RRA_row(
263     rrd_file_t *rrd_file,
264     rrd_t *rrd,
265     unsigned long rra_idx,
266     unsigned short CDP_scratch_idx,
267     rrd_info_t ** pcdp_summary,
268     time_t rra_time);
269
270 static int smooth_all_rras(
271     rrd_t *rrd,
272     rrd_file_t *rrd_file,
273     unsigned long rra_begin);
274
275 #ifndef HAVE_MMAP
276 static int write_changes_to_disk(
277     rrd_t *rrd,
278     rrd_file_t *rrd_file,
279     int version);
280 #endif
281
282 /*
283  * normalize time as returned by gettimeofday. usec part must
284  * be always >= 0
285  */
286 static inline void normalize_time(
287     struct timeval *t)
288 {
289     if (t->tv_usec < 0) {
290         t->tv_sec--;
291         t->tv_usec += 1e6L;
292     }
293 }
294
295 /*
296  * Sets current_time and current_time_usec based on the current time.
297  * current_time_usec is set to 0 if the version number is 1 or 2.
298  */
299 static inline void initialize_time(
300     time_t *current_time,
301     unsigned long *current_time_usec,
302     int version)
303 {
304     struct timeval tmp_time;    /* used for time conversion */
305
306     gettimeofday(&tmp_time, 0);
307     normalize_time(&tmp_time);
308     *current_time = tmp_time.tv_sec;
309     if (version >= 3) {
310         *current_time_usec = tmp_time.tv_usec;
311     } else {
312         *current_time_usec = 0;
313     }
314 }
315
316 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317
318 rrd_info_t *rrd_update_v(
319     int argc,
320     char **argv)
321 {
322     char     *tmplt = NULL;
323     rrd_info_t *result = NULL;
324     rrd_infoval_t rc;
325     char *opt_daemon = NULL;
326     struct option long_options[] = {
327         {"template", required_argument, 0, 't'},
328         {0, 0, 0, 0}
329     };
330
331     rc.u_int = -1;
332     optind = 0;
333     opterr = 0;         /* initialize getopt */
334
335     while (1) {
336         int       option_index = 0;
337         int       opt;
338
339         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
340
341         if (opt == EOF)
342             break;
343
344         switch (opt) {
345         case 't':
346             tmplt = optarg;
347             break;
348
349         case '?':
350             rrd_set_error("unknown option '%s'", argv[optind - 1]);
351             goto end_tag;
352         }
353     }
354
355     opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
356     if (opt_daemon != NULL) {
357         rrd_set_error ("The \"%s\" environment variable is defined, "
358                 "but \"%s\" cannot work with rrdcached. Either unset "
359                 "the environment variable or use \"update\" instead.",
360                 ENV_RRDCACHED_ADDRESS, argv[0]);
361         goto end_tag;
362     }
363
364     /* need at least 2 arguments: filename, data. */
365     if (argc - optind < 2) {
366         rrd_set_error("Not enough arguments");
367         goto end_tag;
368     }
369     rc.u_int = 0;
370     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
371     rc.u_int = _rrd_update(argv[optind], tmplt,
372                            argc - optind - 1,
373                            (const char **) (argv + optind + 1), result);
374     result->value.u_int = rc.u_int;
375   end_tag:
376     return result;
377 }
378
379 int rrd_update(
380     int argc,
381     char **argv)
382 {
383     struct option long_options[] = {
384         {"template", required_argument, 0, 't'},
385         {"daemon",   required_argument, 0, 'd'},
386         {0, 0, 0, 0}
387     };
388     int       option_index = 0;
389     int       opt;
390     char     *tmplt = NULL;
391     int       rc = -1;
392     char     *opt_daemon = NULL;
393
394     optind = 0;
395     opterr = 0;         /* initialize getopt */
396
397     while (1) {
398         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
399
400         if (opt == EOF)
401             break;
402
403         switch (opt) {
404         case 't':
405             tmplt = strdup(optarg);
406             break;
407
408         case 'd':
409             if (opt_daemon != NULL)
410                 free (opt_daemon);
411             opt_daemon = strdup (optarg);
412             if (opt_daemon == NULL)
413             {
414                 rrd_set_error("strdup failed.");
415                 goto out;
416             }
417             break;
418
419         case '?':
420             rrd_set_error("unknown option '%s'", argv[optind - 1]);
421             goto out;
422         }
423     }
424
425     /* need at least 2 arguments: filename, data. */
426     if (argc - optind < 2) {
427         rrd_set_error("Not enough arguments");
428         goto out;
429     }
430
431     {   /* try to connect to rrdcached */
432         int status = rrdc_connect(opt_daemon);
433         if (status != 0) return status;
434     }
435
436     if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
437     {
438         rrd_set_error("The caching daemon cannot be used together with "
439                 "templates yet.");
440         goto out;
441     }
442
443     if (! rrdc_is_connected(opt_daemon))
444     {
445       rc = rrd_update_r(argv[optind], tmplt,
446                         argc - optind - 1, (const char **) (argv + optind + 1));
447     }
448     else /* we are connected */
449     {
450         rc = rrdc_update (argv[optind], /* file */
451                           argc - optind - 1, /* values_num */
452                           (const char *const *) (argv + optind + 1)); /* values */
453         if (rc > 0)
454             rrd_set_error("Failed sending the values to rrdcached: %s",
455                           rrd_strerror (rc));
456     }
457
458   out:
459     if (tmplt != NULL)
460     {
461         free(tmplt);
462         tmplt = NULL;
463     }
464     if (opt_daemon != NULL)
465     {
466         free (opt_daemon);
467         opt_daemon = NULL;
468     }
469     return rc;
470 }
471
472 int rrd_update_r(
473     const char *filename,
474     const char *tmplt,
475     int argc,
476     const char **argv)
477 {
478     return _rrd_update(filename, tmplt, argc, argv, NULL);
479 }
480
481 int _rrd_update(
482     const char *filename,
483     const char *tmplt,
484     int argc,
485     const char **argv,
486     rrd_info_t * pcdp_summary)
487 {
488
489     int       arg_i = 2;
490
491     unsigned long rra_begin;    /* byte pointer to the rra
492                                  * area in the rrd file.  this
493                                  * pointer never changes value */
494     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
495                              * to the existing entry */
496     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
497                              * to the cdp values */
498
499     long     *tmpl_idx; /* index representing the settings
500                          * transported by the tmplt index */
501     unsigned long tmpl_cnt = 2; /* time and data */
502     rrd_t     rrd;
503     time_t    current_time = 0;
504     unsigned long current_time_usec = 0;    /* microseconds part of current time */
505     char    **updvals;
506     int       schedule_smooth = 0;
507
508     /* number of elapsed PDP steps since last update */
509     unsigned long *rra_step_cnt = NULL;
510
511     int       version;  /* rrd version */
512     rrd_file_t *rrd_file;
513     char     *arg_copy; /* for processing the argv */
514     unsigned long *skip_update; /* RRAs to advance but not write */
515
516     /* need at least 1 arguments: data. */
517     if (argc < 1) {
518         rrd_set_error("Not enough arguments");
519         goto err_out;
520     }
521
522     rrd_init(&rrd);
523     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
524         goto err_free;
525     }
526     /* We are now at the beginning of the rra's */
527     rra_begin = rrd_file->header_len;
528
529     version = atoi(rrd.stat_head->version);
530
531     initialize_time(&current_time, &current_time_usec, version);
532
533     /* get exclusive lock to whole file.
534      * lock gets removed when we close the file.
535      */
536     if (rrd_lock(rrd_file) != 0) {
537         rrd_set_error("could not lock RRD");
538         goto err_close;
539     }
540
541     if (allocate_data_structures(&rrd, &updvals,
542                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
543                                  &rra_step_cnt, &skip_update,
544                                  &pdp_new) == -1) {
545         goto err_close;
546     }
547
548     /* loop through the arguments. */
549     for (arg_i = 0; arg_i < argc; arg_i++) {
550         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
551             rrd_set_error("failed duplication argv entry");
552             break;
553         }
554         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
555                         &current_time, &current_time_usec, pdp_temp, pdp_new,
556                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
557                         &pcdp_summary, version, skip_update,
558                         &schedule_smooth) == -1) {
559             if (rrd_test_error()) { /* Should have error string always here */
560                 char     *save_error;
561
562                 /* Prepend file name to error message */
563                 if ((save_error = strdup(rrd_get_error())) != NULL) {
564                     rrd_set_error("%s: %s", filename, save_error);
565                     free(save_error);
566                 }
567             }
568             free(arg_copy);
569             break;
570         }
571         free(arg_copy);
572     }
573
574     free(rra_step_cnt);
575
576     /* if we got here and if there is an error and if the file has not been
577      * written to, then close things up and return. */
578     if (rrd_test_error()) {
579         goto err_free_structures;
580     }
581 #ifndef HAVE_MMAP
582     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
583         goto err_free_structures;
584     }
585 #endif
586
587     /* calling the smoothing code here guarantees at most one smoothing
588      * operation per rrd_update call. Unfortunately, it is possible with bulk
589      * updates, or a long-delayed update for smoothing to occur off-schedule.
590      * This really isn't critical except during the burn-in cycles. */
591     if (schedule_smooth) {
592         smooth_all_rras(&rrd, rrd_file, rra_begin);
593     }
594
595 /*    rrd_dontneed(rrd_file,&rrd); */
596     rrd_free(&rrd);
597     rrd_close(rrd_file);
598
599     free(pdp_new);
600     free(tmpl_idx);
601     free(pdp_temp);
602     free(skip_update);
603     free(updvals);
604     return 0;
605
606   err_free_structures:
607     free(pdp_new);
608     free(tmpl_idx);
609     free(pdp_temp);
610     free(skip_update);
611     free(updvals);
612   err_close:
613     rrd_close(rrd_file);
614   err_free:
615     rrd_free(&rrd);
616   err_out:
617     return -1;
618 }
619
620 /*
621  * Allocate some important arrays used, and initialize the template.
622  *
623  * When it returns, either all of the structures are allocated
624  * or none of them are.
625  *
626  * Returns 0 on success, -1 on error.
627  */
628 static int allocate_data_structures(
629     rrd_t *rrd,
630     char ***updvals,
631     rrd_value_t **pdp_temp,
632     const char *tmplt,
633     long **tmpl_idx,
634     unsigned long *tmpl_cnt,
635     unsigned long **rra_step_cnt,
636     unsigned long **skip_update,
637     rrd_value_t **pdp_new)
638 {
639     unsigned  i, ii;
640     if ((*updvals = (char **) malloc(sizeof(char *)
641                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
642         rrd_set_error("allocating updvals pointer array.");
643         return -1;
644     }
645     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
646                                             * rrd->stat_head->ds_cnt)) ==
647         NULL) {
648         rrd_set_error("allocating pdp_temp.");
649         goto err_free_updvals;
650     }
651     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
652                                                  *
653                                                  rrd->stat_head->rra_cnt)) ==
654         NULL) {
655         rrd_set_error("allocating skip_update.");
656         goto err_free_pdp_temp;
657     }
658     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
659                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
660         rrd_set_error("allocating tmpl_idx.");
661         goto err_free_skip_update;
662     }
663     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
664                                                   *
665                                                   (rrd->stat_head->
666                                                    rra_cnt))) == NULL) {
667         rrd_set_error("allocating rra_step_cnt.");
668         goto err_free_tmpl_idx;
669     }
670
671     /* initialize tmplt redirector */
672     /* default config example (assume DS 1 is a CDEF DS)
673        tmpl_idx[0] -> 0; (time)
674        tmpl_idx[1] -> 1; (DS 0)
675        tmpl_idx[2] -> 3; (DS 2)
676        tmpl_idx[3] -> 4; (DS 3) */
677     (*tmpl_idx)[0] = 0; /* time */
678     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
679         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
680             (*tmpl_idx)[ii++] = i;
681     }
682     *tmpl_cnt = ii;
683
684     if (tmplt != NULL) {
685         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
686             goto err_free_rra_step_cnt;
687         }
688     }
689
690     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
691                                            * rrd->stat_head->ds_cnt)) == NULL) {
692         rrd_set_error("allocating pdp_new.");
693         goto err_free_rra_step_cnt;
694     }
695
696     return 0;
697
698   err_free_rra_step_cnt:
699     free(*rra_step_cnt);
700   err_free_tmpl_idx:
701     free(*tmpl_idx);
702   err_free_skip_update:
703     free(*skip_update);
704   err_free_pdp_temp:
705     free(*pdp_temp);
706   err_free_updvals:
707     free(*updvals);
708     return -1;
709 }
710
711 /*
712  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
713  *
714  * Returns 0 on success.
715  */
716 static int parse_template(
717     rrd_t *rrd,
718     const char *tmplt,
719     unsigned long *tmpl_cnt,
720     long *tmpl_idx)
721 {
722     char     *dsname, *tmplt_copy;
723     unsigned int tmpl_len, i;
724     int       ret = 0;
725
726     *tmpl_cnt = 1;      /* the first entry is the time */
727
728     /* we should work on a writeable copy here */
729     if ((tmplt_copy = strdup(tmplt)) == NULL) {
730         rrd_set_error("error copying tmplt '%s'", tmplt);
731         ret = -1;
732         goto out;
733     }
734
735     dsname = tmplt_copy;
736     tmpl_len = strlen(tmplt_copy);
737     for (i = 0; i <= tmpl_len; i++) {
738         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
739             tmplt_copy[i] = '\0';
740             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
741                 rrd_set_error("tmplt contains more DS definitions than RRD");
742                 ret = -1;
743                 goto out_free_tmpl_copy;
744             }
745             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
746                 rrd_set_error("unknown DS name '%s'", dsname);
747                 ret = -1;
748                 goto out_free_tmpl_copy;
749             }
750             /* go to the next entry on the tmplt_copy */
751             if (i < tmpl_len)
752                 dsname = &tmplt_copy[i + 1];
753         }
754     }
755   out_free_tmpl_copy:
756     free(tmplt_copy);
757   out:
758     return ret;
759 }
760
761 /*
762  * Parse an update string, updates the primary data points (PDPs)
763  * and consolidated data points (CDPs), and writes changes to the RRAs.
764  *
765  * Returns 0 on success, -1 on error.
766  */
767 static int process_arg(
768     char *step_start,
769     rrd_t *rrd,
770     rrd_file_t *rrd_file,
771     unsigned long rra_begin,
772     time_t *current_time,
773     unsigned long *current_time_usec,
774     rrd_value_t *pdp_temp,
775     rrd_value_t *pdp_new,
776     unsigned long *rra_step_cnt,
777     char **updvals,
778     long *tmpl_idx,
779     unsigned long tmpl_cnt,
780     rrd_info_t ** pcdp_summary,
781     int version,
782     unsigned long *skip_update,
783     int *schedule_smooth)
784 {
785     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
786
787     /* a vector of future Holt-Winters seasonal coefs */
788     unsigned long elapsed_pdp_st;
789
790     double    interval, pre_int, post_int;  /* interval between this and
791                                              * the last run */
792     unsigned long proc_pdp_cnt;
793
794     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
795                  current_time, current_time_usec, version) == -1) {
796         return -1;
797     }
798
799     interval = (double) (*current_time - rrd->live_head->last_up)
800         + (double) ((long) *current_time_usec -
801                     (long) rrd->live_head->last_up_usec) / 1e6f;
802
803     /* process the data sources and update the pdp_prep 
804      * area accordingly */
805     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
806         return -1;
807     }
808
809     elapsed_pdp_st = calculate_elapsed_steps(rrd,
810                                              *current_time,
811                                              *current_time_usec, interval,
812                                              &pre_int, &post_int,
813                                              &proc_pdp_cnt);
814
815     /* has a pdp_st moment occurred since the last run ? */
816     if (elapsed_pdp_st == 0) {
817         /* no we have not passed a pdp_st moment. therefore update is simple */
818         simple_update(rrd, interval, pdp_new);
819     } else {
820         /* an pdp_st has occurred. */
821         if (process_all_pdp_st(rrd, interval,
822                                pre_int, post_int,
823                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
824             return -1;
825         }
826         if (update_all_cdp_prep(rrd, rra_step_cnt,
827                                 rra_begin, rrd_file,
828                                 elapsed_pdp_st,
829                                 proc_pdp_cnt,
830                                 &last_seasonal_coef,
831                                 &seasonal_coef,
832                                 pdp_temp,
833                                 skip_update, schedule_smooth) == -1) {
834             goto err_free_coefficients;
835         }
836         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
837                                  elapsed_pdp_st, pdp_temp,
838                                  &seasonal_coef) == -1) {
839             goto err_free_coefficients;
840         }
841         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
842                           *current_time, skip_update,
843                           pcdp_summary) == -1) {
844             goto err_free_coefficients;
845         }
846     }                   /* endif a pdp_st has occurred */
847     rrd->live_head->last_up = *current_time;
848     rrd->live_head->last_up_usec = *current_time_usec;
849
850     if (version < 3) {
851         *rrd->legacy_last_up = rrd->live_head->last_up;
852     }
853     free(seasonal_coef);
854     free(last_seasonal_coef);
855     return 0;
856
857   err_free_coefficients:
858     free(seasonal_coef);
859     free(last_seasonal_coef);
860     return -1;
861 }
862
863 /*
864  * Parse a DS string (time + colon-separated values), storing the
865  * results in current_time, current_time_usec, and updvals.
866  *
867  * Returns 0 on success, -1 on error.
868  */
869 static int parse_ds(
870     rrd_t *rrd,
871     char **updvals,
872     long *tmpl_idx,
873     char *input,
874     unsigned long tmpl_cnt,
875     time_t *current_time,
876     unsigned long *current_time_usec,
877     int version)
878 {
879     char     *p;
880     unsigned long i;
881     char      timesyntax;
882
883     updvals[0] = input;
884     /* initialize all ds input to unknown except the first one
885        which has always got to be set */
886     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
887         updvals[i] = "U";
888
889     /* separate all ds elements; first must be examined separately
890        due to alternate time syntax */
891     if ((p = strchr(input, '@')) != NULL) {
892         timesyntax = '@';
893     } else if ((p = strchr(input, ':')) != NULL) {
894         timesyntax = ':';
895     } else {
896         rrd_set_error("expected timestamp not found in data source from %s",
897                       input);
898         return -1;
899     }
900     *p = '\0';
901     i = 1;
902     updvals[tmpl_idx[i++]] = p + 1;
903     while (*(++p)) {
904         if (*p == ':') {
905             *p = '\0';
906             if (i < tmpl_cnt) {
907                 updvals[tmpl_idx[i++]] = p + 1;
908             }
909         }
910     }
911
912     if (i != tmpl_cnt) {
913         rrd_set_error("expected %lu data source readings (got %lu) from %s",
914                       tmpl_cnt - 1, i, input);
915         return -1;
916     }
917
918     if (get_time_from_reading(rrd, timesyntax, updvals,
919                               current_time, current_time_usec,
920                               version) == -1) {
921         return -1;
922     }
923     return 0;
924 }
925
926 /*
927  * Parse the time in a DS string, store it in current_time and 
928  * current_time_usec and verify that it's later than the last
929  * update for this DS.
930  *
931  * Returns 0 on success, -1 on error.
932  */
933 static int get_time_from_reading(
934     rrd_t *rrd,
935     char timesyntax,
936     char **updvals,
937     time_t *current_time,
938     unsigned long *current_time_usec,
939     int version)
940 {
941     double    tmp;
942     char     *parsetime_error = NULL;
943     char     *old_locale;
944     rrd_time_value_t ds_tv;
945     struct timeval tmp_time;    /* used for time conversion */
946
947     /* get the time from the reading ... handle N */
948     if (timesyntax == '@') {    /* at-style */
949         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
950             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
951             return -1;
952         }
953         if (ds_tv.type == RELATIVE_TO_END_TIME ||
954             ds_tv.type == RELATIVE_TO_START_TIME) {
955             rrd_set_error("specifying time relative to the 'start' "
956                           "or 'end' makes no sense here: %s", updvals[0]);
957             return -1;
958         }
959         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
960         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
961     } else if (strcmp(updvals[0], "N") == 0) {
962         gettimeofday(&tmp_time, 0);
963         normalize_time(&tmp_time);
964         *current_time = tmp_time.tv_sec;
965         *current_time_usec = tmp_time.tv_usec;
966     } else {
967         old_locale = setlocale(LC_NUMERIC, "C");
968         tmp = strtod(updvals[0], 0);
969         setlocale(LC_NUMERIC, old_locale);
970         *current_time = floor(tmp);
971         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
972     }
973     /* dont do any correction for old version RRDs */
974     if (version < 3)
975         *current_time_usec = 0;
976
977     if (*current_time < rrd->live_head->last_up ||
978         (*current_time == rrd->live_head->last_up &&
979          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
980         rrd_set_error("illegal attempt to update using time %ld when "
981                       "last update time is %ld (minimum one second step)",
982                       *current_time, rrd->live_head->last_up);
983         return -1;
984     }
985     return 0;
986 }
987
988 /*
989  * Update pdp_new by interpreting the updvals according to the DS type
990  * (COUNTER, GAUGE, etc.).
991  *
992  * Returns 0 on success, -1 on error.
993  */
994 static int update_pdp_prep(
995     rrd_t *rrd,
996     char **updvals,
997     rrd_value_t *pdp_new,
998     double interval)
999 {
1000     unsigned long ds_idx;
1001     int       ii;
1002     char     *endptr;   /* used in the conversion */
1003     double    rate;
1004     char     *old_locale;
1005     enum dst_en dst_idx;
1006
1007     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1008         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1009
1010         /* make sure we do not build diffs with old last_ds values */
1011         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1012             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1013             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1014         }
1015
1016         /* NOTE: DST_CDEF should never enter this if block, because
1017          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1018          * accidently specified a value for the DST_CDEF. To handle this case,
1019          * an extra check is required. */
1020
1021         if ((updvals[ds_idx + 1][0] != 'U') &&
1022             (dst_idx != DST_CDEF) &&
1023             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1024             rate = DNAN;
1025
1026             /* pdp_new contains rate * time ... eg the bytes transferred during
1027              * the interval. Doing it this way saves a lot of math operations
1028              */
1029             switch (dst_idx) {
1030             case DST_COUNTER:
1031             case DST_DERIVE:
1032                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1033                     if ((updvals[ds_idx + 1][ii] < '0'
1034                          || updvals[ds_idx + 1][ii] > '9')
1035                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1036                         rrd_set_error("not a simple integer: '%s'",
1037                                       updvals[ds_idx + 1]);
1038                         return -1;
1039                     }
1040                 }
1041                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1042                     pdp_new[ds_idx] =
1043                         rrd_diff(updvals[ds_idx + 1],
1044                                  rrd->pdp_prep[ds_idx].last_ds);
1045                     if (dst_idx == DST_COUNTER) {
1046                         /* simple overflow catcher. This will fail
1047                          * terribly for non 32 or 64 bit counters
1048                          * ... are there any others in SNMP land?
1049                          */
1050                         if (pdp_new[ds_idx] < (double) 0.0)
1051                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1052                         if (pdp_new[ds_idx] < (double) 0.0)
1053                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1054                     }
1055                     rate = pdp_new[ds_idx] / interval;
1056                 } else {
1057                     pdp_new[ds_idx] = DNAN;
1058                 }
1059                 break;
1060             case DST_ABSOLUTE:
1061                 old_locale = setlocale(LC_NUMERIC, "C");
1062                 errno = 0;
1063                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1064                 setlocale(LC_NUMERIC, old_locale);
1065                 if (errno > 0) {
1066                     rrd_set_error("converting '%s' to float: %s",
1067                                   updvals[ds_idx + 1], rrd_strerror(errno));
1068                     return -1;
1069                 };
1070                 if (endptr[0] != '\0') {
1071                     rrd_set_error
1072                         ("conversion of '%s' to float not complete: tail '%s'",
1073                          updvals[ds_idx + 1], endptr);
1074                     return -1;
1075                 }
1076                 rate = pdp_new[ds_idx] / interval;
1077                 break;
1078             case DST_GAUGE:
1079                 errno = 0;
1080                 old_locale = setlocale(LC_NUMERIC, "C");
1081                 pdp_new[ds_idx] =
1082                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1083                 setlocale(LC_NUMERIC, old_locale);
1084                 if (errno) {
1085                     rrd_set_error("converting '%s' to float: %s",
1086                                   updvals[ds_idx + 1], rrd_strerror(errno));
1087                     return -1;
1088                 };
1089                 if (endptr[0] != '\0') {
1090                     rrd_set_error
1091                         ("conversion of '%s' to float not complete: tail '%s'",
1092                          updvals[ds_idx + 1], endptr);
1093                     return -1;
1094                 }
1095                 rate = pdp_new[ds_idx] / interval;
1096                 break;
1097             default:
1098                 rrd_set_error("rrd contains unknown DS type : '%s'",
1099                               rrd->ds_def[ds_idx].dst);
1100                 return -1;
1101             }
1102             /* break out of this for loop if the error string is set */
1103             if (rrd_test_error()) {
1104                 return -1;
1105             }
1106             /* make sure pdp_temp is neither too large or too small
1107              * if any of these occur it becomes unknown ...
1108              * sorry folks ... */
1109             if (!isnan(rate) &&
1110                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1111                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1112                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1113                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1114                 pdp_new[ds_idx] = DNAN;
1115             }
1116         } else {
1117             /* no news is news all the same */
1118             pdp_new[ds_idx] = DNAN;
1119         }
1120
1121
1122         /* make a copy of the command line argument for the next run */
1123 #ifdef DEBUG
1124         fprintf(stderr, "prep ds[%lu]\t"
1125                 "last_arg '%s'\t"
1126                 "this_arg '%s'\t"
1127                 "pdp_new %10.2f\n",
1128                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1129                 pdp_new[ds_idx]);
1130 #endif
1131         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1132                 LAST_DS_LEN - 1);
1133         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1134     }
1135     return 0;
1136 }
1137
1138 /*
1139  * How many PDP steps have elapsed since the last update? Returns the answer,
1140  * and stores the time between the last update and the last PDP in pre_time,
1141  * and the time between the last PDP and the current time in post_int.
1142  */
1143 static int calculate_elapsed_steps(
1144     rrd_t *rrd,
1145     unsigned long current_time,
1146     unsigned long current_time_usec,
1147     double interval,
1148     double *pre_int,
1149     double *post_int,
1150     unsigned long *proc_pdp_cnt)
1151 {
1152     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1153     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1154                                  * time */
1155     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1156                                  * when it was last updated */
1157     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1158
1159     /* when was the current pdp started */
1160     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1161     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1162
1163     /* when did the last pdp_st occur */
1164     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1165     occu_pdp_st = current_time - occu_pdp_age;
1166
1167     if (occu_pdp_st > proc_pdp_st) {
1168         /* OK we passed the pdp_st moment */
1169         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1170                                                                      * occurred before the latest
1171                                                                      * pdp_st moment*/
1172         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1173         *post_int = occu_pdp_age;   /* how much after it */
1174         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1175     } else {
1176         *pre_int = interval;
1177         *post_int = 0;
1178     }
1179
1180     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1181
1182 #ifdef DEBUG
1183     printf("proc_pdp_age %lu\t"
1184            "proc_pdp_st %lu\t"
1185            "occu_pfp_age %lu\t"
1186            "occu_pdp_st %lu\t"
1187            "int %lf\t"
1188            "pre_int %lf\t"
1189            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1190            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1191 #endif
1192
1193     /* compute the number of elapsed pdp_st moments */
1194     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1195 }
1196
1197 /*
1198  * Increment the PDP values by the values in pdp_new, or else initialize them.
1199  */
1200 static void simple_update(
1201     rrd_t *rrd,
1202     double interval,
1203     rrd_value_t *pdp_new)
1204 {
1205     int       i;
1206
1207     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1208         if (isnan(pdp_new[i])) {
1209             /* this is not really accurate if we use subsecond data arrival time
1210                should have thought of it when going subsecond resolution ...
1211                sorry next format change we will have it! */
1212             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1213                 floor(interval);
1214         } else {
1215             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1216                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1217             } else {
1218                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1219             }
1220         }
1221 #ifdef DEBUG
1222         fprintf(stderr,
1223                 "NO PDP  ds[%i]\t"
1224                 "value %10.2f\t"
1225                 "unkn_sec %5lu\n",
1226                 i,
1227                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1228                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1229 #endif
1230     }
1231 }
1232
1233 /*
1234  * Call process_pdp_st for each DS.
1235  *
1236  * Returns 0 on success, -1 on error.
1237  */
1238 static int process_all_pdp_st(
1239     rrd_t *rrd,
1240     double interval,
1241     double pre_int,
1242     double post_int,
1243     unsigned long elapsed_pdp_st,
1244     rrd_value_t *pdp_new,
1245     rrd_value_t *pdp_temp)
1246 {
1247     unsigned long ds_idx;
1248
1249     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1250        rate*seconds which occurred up to the last run.
1251        pdp_new[] contains rate*seconds from the latest run.
1252        pdp_temp[] will contain the rate for cdp */
1253
1254     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1255         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1256                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1257                            pdp_new, pdp_temp) == -1) {
1258             return -1;
1259         }
1260 #ifdef DEBUG
1261         fprintf(stderr, "PDP UPD ds[%lu]\t"
1262                 "elapsed_pdp_st %lu\t"
1263                 "pdp_temp %10.2f\t"
1264                 "new_prep %10.2f\t"
1265                 "new_unkn_sec %5lu\n",
1266                 ds_idx,
1267                 elapsed_pdp_st,
1268                 pdp_temp[ds_idx],
1269                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1270                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1271 #endif
1272     }
1273     return 0;
1274 }
1275
1276 /*
1277  * Process an update that occurs after one of the PDP moments.
1278  * Increments the PDP value, sets NAN if time greater than the
1279  * heartbeats have elapsed, processes CDEFs.
1280  *
1281  * Returns 0 on success, -1 on error.
1282  */
1283 static int process_pdp_st(
1284     rrd_t *rrd,
1285     unsigned long ds_idx,
1286     double interval,
1287     double pre_int,
1288     double post_int,
1289     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1290     rrd_value_t *pdp_new,
1291     rrd_value_t *pdp_temp)
1292 {
1293     int       i;
1294
1295     /* update pdp_prep to the current pdp_st. */
1296     double    pre_unknown = 0.0;
1297     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1298     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1299
1300     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1301
1302     rpnstack_init(&rpnstack);
1303
1304
1305     if (isnan(pdp_new[ds_idx])) {
1306         /* a final bit of unknown to be added before calculation
1307            we use a temporary variable for this so that we
1308            don't have to turn integer lines before using the value */
1309         pre_unknown = pre_int;
1310     } else {
1311         if (isnan(scratch[PDP_val].u_val)) {
1312             scratch[PDP_val].u_val = 0;
1313         }
1314         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1315     }
1316
1317     /* if too much of the pdp_prep is unknown we dump it */
1318     /* if the interval is larger thatn mrhb we get NAN */
1319     if ((interval > mrhb) ||
1320         (rrd->stat_head->pdp_step / 2.0 <
1321          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1322         pdp_temp[ds_idx] = DNAN;
1323     } else {
1324         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1325             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1326              pre_unknown);
1327     }
1328
1329     /* process CDEF data sources; remember each CDEF DS can
1330      * only reference other DS with a lower index number */
1331     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1332         rpnp_t   *rpnp;
1333
1334         rpnp =
1335             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1336         /* substitute data values for OP_VARIABLE nodes */
1337         for (i = 0; rpnp[i].op != OP_END; i++) {
1338             if (rpnp[i].op == OP_VARIABLE) {
1339                 rpnp[i].op = OP_NUMBER;
1340                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1341             }
1342         }
1343         /* run the rpn calculator */
1344         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1345             free(rpnp);
1346             rpnstack_free(&rpnstack);
1347             return -1;
1348         }
1349     }
1350
1351     /* make pdp_prep ready for the next run */
1352     if (isnan(pdp_new[ds_idx])) {
1353         /* this is not realy accurate if we use subsecond data arival time
1354            should have thought of it when going subsecond resolution ...
1355            sorry next format change we will have it! */
1356         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1357         scratch[PDP_val].u_val = DNAN;
1358     } else {
1359         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1360         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1361     }
1362     rpnstack_free(&rpnstack);
1363     return 0;
1364 }
1365
1366 /*
1367  * Iterate over all the RRAs for a given DS and:
1368  * 1. Decide whether to schedule a smooth later
1369  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1370  * 3. Update the CDP
1371  *
1372  * Returns 0 on success, -1 on error
1373  */
1374 static int update_all_cdp_prep(
1375     rrd_t *rrd,
1376     unsigned long *rra_step_cnt,
1377     unsigned long rra_begin,
1378     rrd_file_t *rrd_file,
1379     unsigned long elapsed_pdp_st,
1380     unsigned long proc_pdp_cnt,
1381     rrd_value_t **last_seasonal_coef,
1382     rrd_value_t **seasonal_coef,
1383     rrd_value_t *pdp_temp,
1384     unsigned long *skip_update,
1385     int *schedule_smooth)
1386 {
1387     unsigned long rra_idx;
1388
1389     /* index into the CDP scratch array */
1390     enum cf_en current_cf;
1391     unsigned long rra_start;
1392
1393     /* number of rows to be updated in an RRA for a data value. */
1394     unsigned long start_pdp_offset;
1395
1396     rra_start = rra_begin;
1397     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1398         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1399         start_pdp_offset =
1400             rrd->rra_def[rra_idx].pdp_cnt -
1401             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1402         skip_update[rra_idx] = 0;
1403         if (start_pdp_offset <= elapsed_pdp_st) {
1404             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1405                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1406         } else {
1407             rra_step_cnt[rra_idx] = 0;
1408         }
1409
1410         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1411             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1412              * so that they will be correct for the next observed value; note that for
1413              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1414              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1415             if (rra_step_cnt[rra_idx] > 1) {
1416                 skip_update[rra_idx] = 1;
1417                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1418                                 elapsed_pdp_st, last_seasonal_coef);
1419                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1420                                 elapsed_pdp_st + 1, seasonal_coef);
1421             }
1422             /* periodically run a smoother for seasonal effects */
1423             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1424 #ifdef DEBUG
1425                 fprintf(stderr,
1426                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1427                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1428                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1429                         u_cnt);
1430 #endif
1431                 *schedule_smooth = 1;
1432             }
1433         }
1434         if (rrd_test_error())
1435             return -1;
1436
1437         if (update_cdp_prep
1438             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1439              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1440              current_cf) == -1) {
1441             return -1;
1442         }
1443         rra_start +=
1444             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1445             sizeof(rrd_value_t);
1446     }
1447     return 0;
1448 }
1449
1450 /* 
1451  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1452  */
1453 static int do_schedule_smooth(
1454     rrd_t *rrd,
1455     unsigned long rra_idx,
1456     unsigned long elapsed_pdp_st)
1457 {
1458     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1459     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1460     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1461     unsigned long seasonal_smooth_idx =
1462         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1463     unsigned long *init_seasonal =
1464         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1465
1466     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1467      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1468      * really an RRA level, not a data source within RRA level parameter, but
1469      * the rra_def is read only for rrd_update (not flushed to disk). */
1470     if (*init_seasonal > BURNIN_CYCLES) {
1471         /* someone has no doubt invented a trick to deal with this wrap around,
1472          * but at least this code is clear. */
1473         if (seasonal_smooth_idx > cur_row) {
1474             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1475              * between PDP and CDP */
1476             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1477         }
1478         /* can't rely on negative numbers because we are working with
1479          * unsigned values */
1480         return (cur_row + elapsed_pdp_st >= row_cnt
1481                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1482     }
1483     /* mark off one of the burn-in cycles */
1484     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1485 }
1486
1487 /*
1488  * For a given RRA, iterate over the data sources and call the appropriate
1489  * consolidation function.
1490  *
1491  * Returns 0 on success, -1 on error.
1492  */
1493 static int update_cdp_prep(
1494     rrd_t *rrd,
1495     unsigned long elapsed_pdp_st,
1496     unsigned long start_pdp_offset,
1497     unsigned long *rra_step_cnt,
1498     int rra_idx,
1499     rrd_value_t *pdp_temp,
1500     rrd_value_t *last_seasonal_coef,
1501     rrd_value_t *seasonal_coef,
1502     int current_cf)
1503 {
1504     unsigned long ds_idx, cdp_idx;
1505
1506     /* update CDP_PREP areas */
1507     /* loop over data soures within each RRA */
1508     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1509
1510         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1511
1512         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1513             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1514                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1515                        elapsed_pdp_st, start_pdp_offset,
1516                        rrd->rra_def[rra_idx].pdp_cnt,
1517                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1518                        rra_idx, ds_idx);
1519         } else {
1520             /* Nothing to consolidate if there's one PDP per CDP. However, if
1521              * we've missed some PDPs, let's update null counters etc. */
1522             if (elapsed_pdp_st > 2) {
1523                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1524                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1525                           current_cf);
1526             }
1527         }
1528
1529         if (rrd_test_error())
1530             return -1;
1531     }                   /* endif data sources loop */
1532     return 0;
1533 }
1534
1535 /*
1536  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1537  * primary value, secondary value, and # of unknowns.
1538  */
1539 static void update_cdp(
1540     unival *scratch,
1541     int current_cf,
1542     rrd_value_t pdp_temp_val,
1543     unsigned long rra_step_cnt,
1544     unsigned long elapsed_pdp_st,
1545     unsigned long start_pdp_offset,
1546     unsigned long pdp_cnt,
1547     rrd_value_t xff,
1548     int i,
1549     int ii)
1550 {
1551     /* shorthand variables */
1552     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1553     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1554     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1555     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1556
1557     if (rra_step_cnt) {
1558         /* If we are in this block, as least 1 CDP value will be written to
1559          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1560          * to be written, then the "fill in" value is the CDP_secondary_val
1561          * entry. */
1562         if (isnan(pdp_temp_val)) {
1563             *cdp_unkn_pdp_cnt += start_pdp_offset;
1564             *cdp_secondary_val = DNAN;
1565         } else {
1566             /* CDP_secondary value is the RRA "fill in" value for intermediary
1567              * CDP data entries. No matter the CF, the value is the same because
1568              * the average, max, min, and last of a list of identical values is
1569              * the same, namely, the value itself. */
1570             *cdp_secondary_val = pdp_temp_val;
1571         }
1572
1573         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1574             *cdp_primary_val = DNAN;
1575             if (current_cf == CF_AVERAGE) {
1576                 *cdp_val =
1577                     initialize_average_carry_over(pdp_temp_val,
1578                                                   elapsed_pdp_st,
1579                                                   start_pdp_offset, pdp_cnt);
1580             } else {
1581                 *cdp_val = pdp_temp_val;
1582             }
1583         } else {
1584             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1585                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1586         }               /* endif meets xff value requirement for a valid value */
1587         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1588          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1589         if (isnan(pdp_temp_val))
1590             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1591         else
1592             *cdp_unkn_pdp_cnt = 0;
1593     } else {            /* rra_step_cnt[i]  == 0 */
1594
1595 #ifdef DEBUG
1596         if (isnan(*cdp_val)) {
1597             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1598                     i, ii);
1599         } else {
1600             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1601                     i, ii, *cdp_val);
1602         }
1603 #endif
1604         if (isnan(pdp_temp_val)) {
1605             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1606         } else {
1607             *cdp_val =
1608                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1609                                   current_cf, i, ii);
1610         }
1611     }
1612 }
1613
1614 /*
1615  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1616  * on the type of consolidation function.
1617  */
1618 static void initialize_cdp_val(
1619     unival *scratch,
1620     int current_cf,
1621     rrd_value_t pdp_temp_val,
1622     unsigned long elapsed_pdp_st,
1623     unsigned long start_pdp_offset,
1624     unsigned long pdp_cnt)
1625 {
1626     rrd_value_t cum_val, cur_val;
1627
1628     switch (current_cf) {
1629     case CF_AVERAGE:
1630         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1631         cur_val = IFDNAN(pdp_temp_val, 0.0);
1632         scratch[CDP_primary_val].u_val =
1633             (cum_val + cur_val * start_pdp_offset) /
1634             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1635         scratch[CDP_val].u_val =
1636             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1637                                           start_pdp_offset, pdp_cnt);
1638         break;
1639     case CF_MAXIMUM:
1640         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1641         cur_val = IFDNAN(pdp_temp_val, -DINF);
1642 #if 0
1643 #ifdef DEBUG
1644         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1645             fprintf(stderr,
1646                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1647                     i, ii);
1648             exit(-1);
1649         }
1650 #endif
1651 #endif
1652         if (cur_val > cum_val)
1653             scratch[CDP_primary_val].u_val = cur_val;
1654         else
1655             scratch[CDP_primary_val].u_val = cum_val;
1656         /* initialize carry over value */
1657         scratch[CDP_val].u_val = pdp_temp_val;
1658         break;
1659     case CF_MINIMUM:
1660         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1661         cur_val = IFDNAN(pdp_temp_val, DINF);
1662 #if 0
1663 #ifdef DEBUG
1664         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1665             fprintf(stderr,
1666                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1667                     ii);
1668             exit(-1);
1669         }
1670 #endif
1671 #endif
1672         if (cur_val < cum_val)
1673             scratch[CDP_primary_val].u_val = cur_val;
1674         else
1675             scratch[CDP_primary_val].u_val = cum_val;
1676         /* initialize carry over value */
1677         scratch[CDP_val].u_val = pdp_temp_val;
1678         break;
1679     case CF_LAST:
1680     default:
1681         scratch[CDP_primary_val].u_val = pdp_temp_val;
1682         /* initialize carry over value */
1683         scratch[CDP_val].u_val = pdp_temp_val;
1684         break;
1685     }
1686 }
1687
1688 /*
1689  * Update the consolidation function for Holt-Winters functions as
1690  * well as other functions that don't actually consolidate multiple
1691  * PDPs.
1692  */
1693 static void reset_cdp(
1694     rrd_t *rrd,
1695     unsigned long elapsed_pdp_st,
1696     rrd_value_t *pdp_temp,
1697     rrd_value_t *last_seasonal_coef,
1698     rrd_value_t *seasonal_coef,
1699     int rra_idx,
1700     int ds_idx,
1701     int cdp_idx,
1702     enum cf_en current_cf)
1703 {
1704     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1705
1706     switch (current_cf) {
1707     case CF_AVERAGE:
1708     default:
1709         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1710         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1711         break;
1712     case CF_SEASONAL:
1713     case CF_DEVSEASONAL:
1714         /* need to update cached seasonal values, so they are consistent
1715          * with the bulk update */
1716         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1717          * CDP_last_deviation are the same. */
1718         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1719         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1720         break;
1721     case CF_HWPREDICT:
1722     case CF_MHWPREDICT:
1723         /* need to update the null_count and last_null_count.
1724          * even do this for non-DNAN pdp_temp because the
1725          * algorithm is not learning from batch updates. */
1726         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1727         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1728         /* fall through */
1729     case CF_DEVPREDICT:
1730         scratch[CDP_primary_val].u_val = DNAN;
1731         scratch[CDP_secondary_val].u_val = DNAN;
1732         break;
1733     case CF_FAILURES:
1734         /* do not count missed bulk values as failures */
1735         scratch[CDP_primary_val].u_val = 0;
1736         scratch[CDP_secondary_val].u_val = 0;
1737         /* need to reset violations buffer.
1738          * could do this more carefully, but for now, just
1739          * assume a bulk update wipes away all violations. */
1740         erase_violations(rrd, cdp_idx, rra_idx);
1741         break;
1742     }
1743 }
1744
1745 static rrd_value_t initialize_average_carry_over(
1746     rrd_value_t pdp_temp_val,
1747     unsigned long elapsed_pdp_st,
1748     unsigned long start_pdp_offset,
1749     unsigned long pdp_cnt)
1750 {
1751     /* initialize carry over value */
1752     if (isnan(pdp_temp_val)) {
1753         return DNAN;
1754     }
1755     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1756 }
1757
1758 /*
1759  * Update or initialize a CDP value based on the consolidation
1760  * function.
1761  *
1762  * Returns the new value.
1763  */
1764 static rrd_value_t calculate_cdp_val(
1765     rrd_value_t cdp_val,
1766     rrd_value_t pdp_temp_val,
1767     unsigned long elapsed_pdp_st,
1768     int current_cf,
1769 #ifdef DEBUG
1770     int i,
1771     int ii
1772 #else
1773     int UNUSED(i),
1774     int UNUSED(ii)
1775 #endif
1776     )
1777 {
1778     if (isnan(cdp_val)) {
1779         if (current_cf == CF_AVERAGE) {
1780             pdp_temp_val *= elapsed_pdp_st;
1781         }
1782 #ifdef DEBUG
1783         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1784                 i, ii, pdp_temp_val);
1785 #endif
1786         return pdp_temp_val;
1787     }
1788     if (current_cf == CF_AVERAGE)
1789         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1790     if (current_cf == CF_MINIMUM)
1791         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1792     if (current_cf == CF_MAXIMUM)
1793         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1794
1795     return pdp_temp_val;
1796 }
1797
1798 /*
1799  * For each RRA, update the seasonal values and then call update_aberrant_CF
1800  * for each data source.
1801  *
1802  * Return 0 on success, -1 on error.
1803  */
1804 static int update_aberrant_cdps(
1805     rrd_t *rrd,
1806     rrd_file_t *rrd_file,
1807     unsigned long rra_begin,
1808     unsigned long elapsed_pdp_st,
1809     rrd_value_t *pdp_temp,
1810     rrd_value_t **seasonal_coef)
1811 {
1812     unsigned long rra_idx, ds_idx, j;
1813
1814     /* number of PDP steps since the last update that
1815      * are assigned to the first CDP to be generated
1816      * since the last update. */
1817     unsigned short scratch_idx;
1818     unsigned long rra_start;
1819     enum cf_en current_cf;
1820
1821     /* this loop is only entered if elapsed_pdp_st < 3 */
1822     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1823          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1824         rra_start = rra_begin;
1825         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1826             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1827                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1828                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1829                     if (scratch_idx == CDP_primary_val) {
1830                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1831                                         elapsed_pdp_st + 1, seasonal_coef);
1832                     } else {
1833                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1834                                         elapsed_pdp_st + 2, seasonal_coef);
1835                     }
1836                 }
1837                 if (rrd_test_error())
1838                     return -1;
1839                 /* loop over data soures within each RRA */
1840                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1841                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1842                                        rra_idx * (rrd->stat_head->ds_cnt) +
1843                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1844                                        *seasonal_coef);
1845                 }
1846             }
1847             rra_start += rrd->rra_def[rra_idx].row_cnt
1848                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1849         }
1850     }
1851     return 0;
1852 }
1853
1854 /* 
1855  * Move sequentially through the file, writing one RRA at a time.  Note this
1856  * architecture divorces the computation of CDP with flushing updated RRA
1857  * entries to disk.
1858  *
1859  * Return 0 on success, -1 on error.
1860  */
1861 static int write_to_rras(
1862     rrd_t *rrd,
1863     rrd_file_t *rrd_file,
1864     unsigned long *rra_step_cnt,
1865     unsigned long rra_begin,
1866     time_t current_time,
1867     unsigned long *skip_update,
1868     rrd_info_t ** pcdp_summary)
1869 {
1870     unsigned long rra_idx;
1871     unsigned long rra_start;
1872     time_t    rra_time = 0; /* time of update for a RRA */
1873
1874     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1875     
1876     /* Ready to write to disk */
1877     rra_start = rra_begin;
1878
1879     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1880         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1881         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1882
1883         /* for cdp_prep */
1884         unsigned short scratch_idx;
1885         unsigned long step_subtract;
1886
1887         for (scratch_idx = CDP_primary_val,
1888                  step_subtract = 1;
1889              rra_step_cnt[rra_idx] > 0;
1890              rra_step_cnt[rra_idx]--,
1891                  scratch_idx = CDP_secondary_val,
1892                  step_subtract = 2) {
1893
1894             off_t rra_pos_new;
1895 #ifdef DEBUG
1896             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1897 #endif
1898             /* increment, with wrap-around */
1899             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1900               rra_ptr->cur_row = 0;
1901
1902             /* we know what our position should be */
1903             rra_pos_new = rra_start
1904               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1905
1906             /* re-seek if the position is wrong or we wrapped around */
1907             if (rra_pos_new != rrd_file->pos) {
1908                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1909                     rrd_set_error("seek error in rrd");
1910                     return -1;
1911                 }
1912             }
1913 #ifdef DEBUG
1914             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1915 #endif
1916
1917             if (skip_update[rra_idx])
1918                 continue;
1919
1920             if (*pcdp_summary != NULL) {
1921                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1922
1923                 rra_time = (current_time - current_time % step_time)
1924                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1925             }
1926
1927             if (write_RRA_row
1928                 (rrd_file, rrd, rra_idx, scratch_idx,
1929                  pcdp_summary, rra_time) == -1)
1930                 return -1;
1931
1932             rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1933         }
1934
1935         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1936     } /* RRA LOOP */
1937
1938     return 0;
1939 }
1940
1941 /*
1942  * Write out one row of values (one value per DS) to the archive.
1943  *
1944  * Returns 0 on success, -1 on error.
1945  */
1946 static int write_RRA_row(
1947     rrd_file_t *rrd_file,
1948     rrd_t *rrd,
1949     unsigned long rra_idx,
1950     unsigned short CDP_scratch_idx,
1951     rrd_info_t ** pcdp_summary,
1952     time_t rra_time)
1953 {
1954     unsigned long ds_idx, cdp_idx;
1955     rrd_infoval_t iv;
1956
1957     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1958         /* compute the cdp index */
1959         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1960 #ifdef DEBUG
1961         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1962                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1963                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1964 #endif
1965         if (*pcdp_summary != NULL) {
1966             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1967             /* append info to the return hash */
1968             *pcdp_summary = rrd_info_push(*pcdp_summary,
1969                                           sprintf_alloc
1970                                           ("[%lli]RRA[%s][%lu]DS[%s]", rra_time,
1971                                            rrd->rra_def[rra_idx].cf_nam,
1972                                            rrd->rra_def[rra_idx].pdp_cnt,
1973                                            rrd->ds_def[ds_idx].ds_nam),
1974                                           RD_I_VAL, iv);
1975         }
1976         if (rrd_write(rrd_file,
1977                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1978                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1979             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1980             return -1;
1981         }
1982     }
1983     return 0;
1984 }
1985
1986 /*
1987  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1988  *
1989  * Returns 0 on success, -1 otherwise
1990  */
1991 static int smooth_all_rras(
1992     rrd_t *rrd,
1993     rrd_file_t *rrd_file,
1994     unsigned long rra_begin)
1995 {
1996     unsigned long rra_start = rra_begin;
1997     unsigned long rra_idx;
1998
1999     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2000         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2001             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2002 #ifdef DEBUG
2003             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2004 #endif
2005             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2006             if (rrd_test_error())
2007                 return -1;
2008         }
2009         rra_start += rrd->rra_def[rra_idx].row_cnt
2010             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2011     }
2012     return 0;
2013 }
2014
2015 #ifndef HAVE_MMAP
2016 /*
2017  * Flush changes to disk (unless we're using mmap)
2018  *
2019  * Returns 0 on success, -1 otherwise
2020  */
2021 static int write_changes_to_disk(
2022     rrd_t *rrd,
2023     rrd_file_t *rrd_file,
2024     int version)
2025 {
2026     /* we just need to write back the live header portion now */
2027     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2028                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2029                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2030                  SEEK_SET) != 0) {
2031         rrd_set_error("seek rrd for live header writeback");
2032         return -1;
2033     }
2034     if (version >= 3) {
2035         if (rrd_write(rrd_file, rrd->live_head,
2036                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2037             rrd_set_error("rrd_write live_head to rrd");
2038             return -1;
2039         }
2040     } else {
2041         if (rrd_write(rrd_file, rrd->legacy_last_up,
2042                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2043             rrd_set_error("rrd_write live_head to rrd");
2044             return -1;
2045         }
2046     }
2047
2048
2049     if (rrd_write(rrd_file, rrd->pdp_prep,
2050                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2051         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2052         rrd_set_error("rrd_write pdp_prep to rrd");
2053         return -1;
2054     }
2055
2056     if (rrd_write(rrd_file, rrd->cdp_prep,
2057                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2058                   rrd->stat_head->ds_cnt)
2059         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2060                       rrd->stat_head->ds_cnt)) {
2061
2062         rrd_set_error("rrd_write cdp_prep to rrd");
2063         return -1;
2064     }
2065
2066     if (rrd_write(rrd_file, rrd->rra_ptr,
2067                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2068         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2069         rrd_set_error("rrd_write rra_ptr to rrd");
2070         return -1;
2071     }
2072     return 0;
2073 }
2074 #endif