fixed indentation ... gnu indent results are not realy beautifl. I might switch...
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.1  Copyright by Tobi Oetiker, 1997-2008
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
9
10 #include "rrd_tool.h"
11
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
17
18 #include <locale.h>
19
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
22
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
25
26 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 /*
28  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
29  * replacement.
30  */
31 #include <sys/timeb.h>
32
33 #ifndef __MINGW32__
34 struct timeval {
35     time_t    tv_sec;   /* seconds */
36     long      tv_usec;  /* microseconds */
37 };
38 #endif
39
40 struct __timezone {
41     int       tz_minuteswest;   /* minutes W of Greenwich */
42     int       tz_dsttime;   /* type of dst correction */
43 };
44
45 static int gettimeofday(
46     struct timeval *t,
47     struct __timezone *tz)
48 {
49
50     struct _timeb current_time;
51
52     _ftime(&current_time);
53
54     t->tv_sec = current_time.time;
55     t->tv_usec = current_time.millitm * 1000;
56
57     return 0;
58 }
59
60 #endif
61
62 /* FUNCTION PROTOTYPES */
63
64 int       rrd_update_r(
65     const char *filename,
66     const char *tmplt,
67     int argc,
68     const char **argv);
69 int       _rrd_update(
70     const char *filename,
71     const char *tmplt,
72     int argc,
73     const char **argv,
74     rrd_info_t *);
75
76 static int allocate_data_structures(
77     rrd_t *rrd,
78     char ***updvals,
79     rrd_value_t **pdp_temp,
80     const char *tmplt,
81     long **tmpl_idx,
82     unsigned long *tmpl_cnt,
83     unsigned long **rra_step_cnt,
84     unsigned long **skip_update,
85     rrd_value_t **pdp_new);
86
87 static int parse_template(
88     rrd_t *rrd,
89     const char *tmplt,
90     unsigned long *tmpl_cnt,
91     long *tmpl_idx);
92
93 static int process_arg(
94     char *step_start,
95     rrd_t *rrd,
96     rrd_file_t *rrd_file,
97     unsigned long rra_begin,
98     unsigned long *rra_current,
99     time_t *current_time,
100     unsigned long *current_time_usec,
101     rrd_value_t *pdp_temp,
102     rrd_value_t *pdp_new,
103     unsigned long *rra_step_cnt,
104     char **updvals,
105     long *tmpl_idx,
106     unsigned long tmpl_cnt,
107     rrd_info_t ** pcdp_summary,
108     int version,
109     unsigned long *skip_update,
110     int *schedule_smooth);
111
112 static int parse_ds(
113     rrd_t *rrd,
114     char **updvals,
115     long *tmpl_idx,
116     char *input,
117     unsigned long tmpl_cnt,
118     time_t *current_time,
119     unsigned long *current_time_usec,
120     int version);
121
122 static int get_time_from_reading(
123     rrd_t *rrd,
124     char timesyntax,
125     char **updvals,
126     time_t *current_time,
127     unsigned long *current_time_usec,
128     int version);
129
130 static int update_pdp_prep(
131     rrd_t *rrd,
132     char **updvals,
133     rrd_value_t *pdp_new,
134     double interval);
135
136 static int calculate_elapsed_steps(
137     rrd_t *rrd,
138     unsigned long current_time,
139     unsigned long current_time_usec,
140     double interval,
141     double *pre_int,
142     double *post_int,
143     unsigned long *proc_pdp_cnt);
144
145 static void simple_update(
146     rrd_t *rrd,
147     double interval,
148     rrd_value_t *pdp_new);
149
150 static int process_all_pdp_st(
151     rrd_t *rrd,
152     double interval,
153     double pre_int,
154     double post_int,
155     unsigned long elapsed_pdp_st,
156     rrd_value_t *pdp_new,
157     rrd_value_t *pdp_temp);
158
159 static int process_pdp_st(
160     rrd_t *rrd,
161     unsigned long ds_idx,
162     double interval,
163     double pre_int,
164     double post_int,
165     long diff_pdp_st,
166     rrd_value_t *pdp_new,
167     rrd_value_t *pdp_temp);
168
169 static int update_all_cdp_prep(
170     rrd_t *rrd,
171     unsigned long *rra_step_cnt,
172     unsigned long rra_begin,
173     rrd_file_t *rrd_file,
174     unsigned long elapsed_pdp_st,
175     unsigned long proc_pdp_cnt,
176     rrd_value_t **last_seasonal_coef,
177     rrd_value_t **seasonal_coef,
178     rrd_value_t *pdp_temp,
179     unsigned long *rra_current,
180     unsigned long *skip_update,
181     int *schedule_smooth);
182
183 static int do_schedule_smooth(
184     rrd_t *rrd,
185     unsigned long rra_idx,
186     unsigned long elapsed_pdp_st);
187
188 static int update_cdp_prep(
189     rrd_t *rrd,
190     unsigned long elapsed_pdp_st,
191     unsigned long start_pdp_offset,
192     unsigned long *rra_step_cnt,
193     int rra_idx,
194     rrd_value_t *pdp_temp,
195     rrd_value_t *last_seasonal_coef,
196     rrd_value_t *seasonal_coef,
197     int current_cf);
198
199 static void update_cdp(
200     unival *scratch,
201     int current_cf,
202     rrd_value_t pdp_temp_val,
203     unsigned long rra_step_cnt,
204     unsigned long elapsed_pdp_st,
205     unsigned long start_pdp_offset,
206     unsigned long pdp_cnt,
207     rrd_value_t xff,
208     int i,
209     int ii);
210
211 static void initialize_cdp_val(
212     unival *scratch,
213     int current_cf,
214     rrd_value_t pdp_temp_val,
215     unsigned long elapsed_pdp_st,
216     unsigned long start_pdp_offset,
217     unsigned long pdp_cnt);
218
219 static void reset_cdp(
220     rrd_t *rrd,
221     unsigned long elapsed_pdp_st,
222     rrd_value_t *pdp_temp,
223     rrd_value_t *last_seasonal_coef,
224     rrd_value_t *seasonal_coef,
225     int rra_idx,
226     int ds_idx,
227     int cdp_idx,
228     enum cf_en current_cf);
229
230 static rrd_value_t initialize_average_carry_over(
231     rrd_value_t pdp_temp_val,
232     unsigned long elapsed_pdp_st,
233     unsigned long start_pdp_offset,
234     unsigned long pdp_cnt);
235
236 static rrd_value_t calculate_cdp_val(
237     rrd_value_t cdp_val,
238     rrd_value_t pdp_temp_val,
239     unsigned long elapsed_pdp_st,
240     int current_cf,
241     int i,
242     int ii);
243
244 static int update_aberrant_cdps(
245     rrd_t *rrd,
246     rrd_file_t *rrd_file,
247     unsigned long rra_begin,
248     unsigned long *rra_current,
249     unsigned long elapsed_pdp_st,
250     rrd_value_t *pdp_temp,
251     rrd_value_t **seasonal_coef);
252
253 static int write_to_rras(
254     rrd_t *rrd,
255     rrd_file_t *rrd_file,
256     unsigned long *rra_step_cnt,
257     unsigned long rra_begin,
258     unsigned long *rra_current,
259     time_t current_time,
260     unsigned long *skip_update,
261     rrd_info_t ** pcdp_summary);
262
263 static int write_RRA_row(
264     rrd_file_t *rrd_file,
265     rrd_t *rrd,
266     unsigned long rra_idx,
267     unsigned long *rra_current,
268     unsigned short CDP_scratch_idx,
269     rrd_info_t ** pcdp_summary,
270     time_t rra_time);
271
272 static int smooth_all_rras(
273     rrd_t *rrd,
274     rrd_file_t *rrd_file,
275     unsigned long rra_begin);
276
277 #ifndef HAVE_MMAP
278 static int write_changes_to_disk(
279     rrd_t *rrd,
280     rrd_file_t *rrd_file,
281     int version);
282 #endif
283
284 /*
285  * normalize time as returned by gettimeofday. usec part must
286  * be always >= 0
287  */
288 static inline void normalize_time(
289     struct timeval *t)
290 {
291     if (t->tv_usec < 0) {
292         t->tv_sec--;
293         t->tv_usec += 1e6L;
294     }
295 }
296
297 /*
298  * Sets current_time and current_time_usec based on the current time.
299  * current_time_usec is set to 0 if the version number is 1 or 2.
300  */
301 static inline void initialize_time(
302     time_t *current_time,
303     unsigned long *current_time_usec,
304     int version)
305 {
306     struct timeval tmp_time;    /* used for time conversion */
307
308     gettimeofday(&tmp_time, 0);
309     normalize_time(&tmp_time);
310     *current_time = tmp_time.tv_sec;
311     if (version >= 3) {
312         *current_time_usec = tmp_time.tv_usec;
313     } else {
314         *current_time_usec = 0;
315     }
316 }
317
318 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
319
320 rrd_info_t *rrd_update_v(
321     int argc,
322     char **argv)
323 {
324     char     *tmplt = NULL;
325     rrd_info_t *result = NULL;
326     rrd_infoval_t rc;
327     struct option long_options[] = {
328         {"template", required_argument, 0, 't'},
329         {0, 0, 0, 0}
330     };
331
332     rc.u_int = -1;
333     optind = 0;
334     opterr = 0;         /* initialize getopt */
335
336     while (1) {
337         int       option_index = 0;
338         int       opt;
339
340         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
341
342         if (opt == EOF)
343             break;
344
345         switch (opt) {
346         case 't':
347             tmplt = optarg;
348             break;
349
350         case '?':
351             rrd_set_error("unknown option '%s'", argv[optind - 1]);
352             goto end_tag;
353         }
354     }
355
356     /* need at least 2 arguments: filename, data. */
357     if (argc - optind < 2) {
358         rrd_set_error("Not enough arguments");
359         goto end_tag;
360     }
361     rc.u_int = 0;
362     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
363     rc.u_int = _rrd_update(argv[optind], tmplt,
364                            argc - optind - 1,
365                            (const char **) (argv + optind + 1), result);
366     result->value.u_int = rc.u_int;
367   end_tag:
368     return result;
369 }
370
371 int rrd_update(
372     int argc,
373     char **argv)
374 {
375     struct option long_options[] = {
376         {"template", required_argument, 0, 't'},
377         {0, 0, 0, 0}
378     };
379     int       option_index = 0;
380     int       opt;
381     char     *tmplt = NULL;
382     int       rc = -1;
383
384     optind = 0;
385     opterr = 0;         /* initialize getopt */
386
387     while (1) {
388         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
389
390         if (opt == EOF)
391             break;
392
393         switch (opt) {
394         case 't':
395             tmplt = strdup(optarg);
396             break;
397
398         case '?':
399             rrd_set_error("unknown option '%s'", argv[optind - 1]);
400             goto out;
401         }
402     }
403
404     /* need at least 2 arguments: filename, data. */
405     if (argc - optind < 2) {
406         rrd_set_error("Not enough arguments");
407         goto out;
408     }
409
410     rc = rrd_update_r(argv[optind], tmplt,
411                       argc - optind - 1, (const char **) (argv + optind + 1));
412   out:
413     free(tmplt);
414     return rc;
415 }
416
417 int rrd_update_r(
418     const char *filename,
419     const char *tmplt,
420     int argc,
421     const char **argv)
422 {
423     return _rrd_update(filename, tmplt, argc, argv, NULL);
424 }
425
426 int _rrd_update(
427     const char *filename,
428     const char *tmplt,
429     int argc,
430     const char **argv,
431     rrd_info_t * pcdp_summary)
432 {
433
434     int       arg_i = 2;
435
436     unsigned long rra_begin;    /* byte pointer to the rra
437                                  * area in the rrd file.  this
438                                  * pointer never changes value */
439     unsigned long rra_current;  /* byte pointer to the current write
440                                  * spot in the rrd file. */
441     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
442                              * to the existing entry */
443     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
444                              * to the cdp values */
445
446     long     *tmpl_idx; /* index representing the settings
447                          * transported by the tmplt index */
448     unsigned long tmpl_cnt = 2; /* time and data */
449     rrd_t     rrd;
450     time_t    current_time = 0;
451     unsigned long current_time_usec = 0;    /* microseconds part of current time */
452     char    **updvals;
453     int       schedule_smooth = 0;
454
455     /* number of elapsed PDP steps since last update */
456     unsigned long *rra_step_cnt = NULL;
457
458     int       version;  /* rrd version */
459     rrd_file_t *rrd_file;
460     char     *arg_copy; /* for processing the argv */
461     unsigned long *skip_update; /* RRAs to advance but not write */
462
463     /* need at least 1 arguments: data. */
464     if (argc < 1) {
465         rrd_set_error("Not enough arguments");
466         goto err_out;
467     }
468
469     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
470         goto err_free;
471     }
472     /* We are now at the beginning of the rra's */
473     rra_current = rra_begin = rrd_file->header_len;
474
475     version = atoi(rrd.stat_head->version);
476
477     initialize_time(&current_time, &current_time_usec, version);
478
479     /* get exclusive lock to whole file.
480      * lock gets removed when we close the file.
481      */
482     if (rrd_lock(rrd_file) != 0) {
483         rrd_set_error("could not lock RRD");
484         goto err_close;
485     }
486
487     if (allocate_data_structures(&rrd, &updvals,
488                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
489                                  &rra_step_cnt, &skip_update,
490                                  &pdp_new) == -1) {
491         goto err_close;
492     }
493
494     /* loop through the arguments. */
495     for (arg_i = 0; arg_i < argc; arg_i++) {
496         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
497             rrd_set_error("failed duplication argv entry");
498             break;
499         }
500         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
501                         &current_time, &current_time_usec, pdp_temp, pdp_new,
502                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
503                         &pcdp_summary, version, skip_update,
504                         &schedule_smooth) == -1) {
505             if (rrd_test_error()) { /* Should have error string always here */
506                 char     *save_error;
507
508                 /* Prepend file name to error message */
509                 if ((save_error = strdup(rrd_get_error())) != NULL) {
510                     rrd_set_error("%s: %s", filename, save_error);
511                     free(save_error);
512                 }
513             }
514             free(arg_copy);
515             break;
516         }
517         free(arg_copy);
518     }
519
520     free(rra_step_cnt);
521
522     /* if we got here and if there is an error and if the file has not been
523      * written to, then close things up and return. */
524     if (rrd_test_error()) {
525         goto err_free_structures;
526     }
527 #ifndef HAVE_MMAP
528     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
529         goto err_free_structures;
530     }
531 #endif
532
533     /* calling the smoothing code here guarantees at most one smoothing
534      * operation per rrd_update call. Unfortunately, it is possible with bulk
535      * updates, or a long-delayed update for smoothing to occur off-schedule.
536      * This really isn't critical except during the burn-in cycles. */
537     if (schedule_smooth) {
538         smooth_all_rras(&rrd, rrd_file, rra_begin);
539     }
540
541 /*    rrd_dontneed(rrd_file,&rrd); */
542     rrd_free(&rrd);
543     rrd_close(rrd_file);
544
545     free(pdp_new);
546     free(tmpl_idx);
547     free(pdp_temp);
548     free(skip_update);
549     free(updvals);
550     return 0;
551
552   err_free_structures:
553     free(pdp_new);
554     free(tmpl_idx);
555     free(pdp_temp);
556     free(skip_update);
557     free(updvals);
558   err_close:
559     rrd_close(rrd_file);
560   err_free:
561     rrd_free(&rrd);
562   err_out:
563     return -1;
564 }
565
566 /*
567  * get exclusive lock to whole file.
568  * lock gets removed when we close the file
569  *
570  * returns 0 on success
571  */
572 int rrd_lock(
573     rrd_file_t *file)
574 {
575     int       rcstat;
576
577     {
578 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
579         struct _stat st;
580
581         if (_fstat(file->fd, &st) == 0) {
582             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
583         } else {
584             rcstat = -1;
585         }
586 #else
587         struct flock lock;
588
589         lock.l_type = F_WRLCK;  /* exclusive write lock */
590         lock.l_len = 0; /* whole file */
591         lock.l_start = 0;   /* start of file */
592         lock.l_whence = SEEK_SET;   /* end of file */
593
594         rcstat = fcntl(file->fd, F_SETLK, &lock);
595 #endif
596     }
597
598     return (rcstat);
599 }
600
601 /*
602  * Allocate some important arrays used, and initialize the template.
603  *
604  * When it returns, either all of the structures are allocated
605  * or none of them are.
606  *
607  * Returns 0 on success, -1 on error.
608  */
609 static int allocate_data_structures(
610     rrd_t *rrd,
611     char ***updvals,
612     rrd_value_t **pdp_temp,
613     const char *tmplt,
614     long **tmpl_idx,
615     unsigned long *tmpl_cnt,
616     unsigned long **rra_step_cnt,
617     unsigned long **skip_update,
618     rrd_value_t **pdp_new)
619 {
620     unsigned  i, ii;
621     if ((*updvals = (char **) malloc(sizeof(char *)
622                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
623         rrd_set_error("allocating updvals pointer array.");
624         return -1;
625     }
626     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
627                                             * rrd->stat_head->ds_cnt)) ==
628         NULL) {
629         rrd_set_error("allocating pdp_temp.");
630         goto err_free_updvals;
631     }
632     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
633                                                  *
634                                                  rrd->stat_head->rra_cnt)) ==
635         NULL) {
636         rrd_set_error("allocating skip_update.");
637         goto err_free_pdp_temp;
638     }
639     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
640                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
641         rrd_set_error("allocating tmpl_idx.");
642         goto err_free_skip_update;
643     }
644     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
645                                                   *
646                                                   (rrd->stat_head->
647                                                    rra_cnt))) == NULL) {
648         rrd_set_error("allocating rra_step_cnt.");
649         goto err_free_tmpl_idx;
650     }
651
652     /* initialize tmplt redirector */
653     /* default config example (assume DS 1 is a CDEF DS)
654        tmpl_idx[0] -> 0; (time)
655        tmpl_idx[1] -> 1; (DS 0)
656        tmpl_idx[2] -> 3; (DS 2)
657        tmpl_idx[3] -> 4; (DS 3) */
658     (*tmpl_idx)[0] = 0; /* time */
659     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
660         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
661             (*tmpl_idx)[ii++] = i;
662     }
663     *tmpl_cnt = ii;
664
665     if (tmplt != NULL) {
666         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
667             goto err_free_rra_step_cnt;
668         }
669     }
670
671     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
672                                            * rrd->stat_head->ds_cnt)) == NULL) {
673         rrd_set_error("allocating pdp_new.");
674         goto err_free_rra_step_cnt;
675     }
676
677     return 0;
678
679   err_free_rra_step_cnt:
680     free(*rra_step_cnt);
681   err_free_tmpl_idx:
682     free(*tmpl_idx);
683   err_free_skip_update:
684     free(*skip_update);
685   err_free_pdp_temp:
686     free(*pdp_temp);
687   err_free_updvals:
688     free(*updvals);
689     return -1;
690 }
691
692 /*
693  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
694  *
695  * Returns 0 on success.
696  */
697 static int parse_template(
698     rrd_t *rrd,
699     const char *tmplt,
700     unsigned long *tmpl_cnt,
701     long *tmpl_idx)
702 {
703     char     *dsname, *tmplt_copy;
704     unsigned int tmpl_len, i;
705     int       ret = 0;
706
707     *tmpl_cnt = 1;      /* the first entry is the time */
708
709     /* we should work on a writeable copy here */
710     if ((tmplt_copy = strdup(tmplt)) == NULL) {
711         rrd_set_error("error copying tmplt '%s'", tmplt);
712         ret = -1;
713         goto out;
714     }
715
716     dsname = tmplt_copy;
717     tmpl_len = strlen(tmplt_copy);
718     for (i = 0; i <= tmpl_len; i++) {
719         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
720             tmplt_copy[i] = '\0';
721             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
722                 rrd_set_error("tmplt contains more DS definitions than RRD");
723                 ret = -1;
724                 goto out_free_tmpl_copy;
725             }
726             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
727                 rrd_set_error("unknown DS name '%s'", dsname);
728                 ret = -1;
729                 goto out_free_tmpl_copy;
730             }
731             /* go to the next entry on the tmplt_copy */
732             if (i < tmpl_len)
733                 dsname = &tmplt_copy[i + 1];
734         }
735     }
736   out_free_tmpl_copy:
737     free(tmplt_copy);
738   out:
739     return ret;
740 }
741
742 /*
743  * Parse an update string, updates the primary data points (PDPs)
744  * and consolidated data points (CDPs), and writes changes to the RRAs.
745  *
746  * Returns 0 on success, -1 on error.
747  */
748 static int process_arg(
749     char *step_start,
750     rrd_t *rrd,
751     rrd_file_t *rrd_file,
752     unsigned long rra_begin,
753     unsigned long *rra_current,
754     time_t *current_time,
755     unsigned long *current_time_usec,
756     rrd_value_t *pdp_temp,
757     rrd_value_t *pdp_new,
758     unsigned long *rra_step_cnt,
759     char **updvals,
760     long *tmpl_idx,
761     unsigned long tmpl_cnt,
762     rrd_info_t ** pcdp_summary,
763     int version,
764     unsigned long *skip_update,
765     int *schedule_smooth)
766 {
767     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
768
769     /* a vector of future Holt-Winters seasonal coefs */
770     unsigned long elapsed_pdp_st;
771
772     double    interval, pre_int, post_int;  /* interval between this and
773                                              * the last run */
774     unsigned long proc_pdp_cnt;
775     unsigned long rra_start;
776
777     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
778                  current_time, current_time_usec, version) == -1) {
779         return -1;
780     }
781     /* seek to the beginning of the rra's */
782     if (*rra_current != rra_begin) {
783 #ifndef HAVE_MMAP
784         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
785             rrd_set_error("seek error in rrd");
786             return -1;
787         }
788 #endif
789         *rra_current = rra_begin;
790     }
791     rra_start = rra_begin;
792
793     interval = (double) (*current_time - rrd->live_head->last_up)
794         + (double) ((long) *current_time_usec -
795                     (long) rrd->live_head->last_up_usec) / 1e6f;
796
797     /* process the data sources and update the pdp_prep 
798      * area accordingly */
799     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
800         return -1;
801     }
802
803     elapsed_pdp_st = calculate_elapsed_steps(rrd,
804                                              *current_time,
805                                              *current_time_usec, interval,
806                                              &pre_int, &post_int,
807                                              &proc_pdp_cnt);
808
809     /* has a pdp_st moment occurred since the last run ? */
810     if (elapsed_pdp_st == 0) {
811         /* no we have not passed a pdp_st moment. therefore update is simple */
812         simple_update(rrd, interval, pdp_new);
813     } else {
814         /* an pdp_st has occurred. */
815         if (process_all_pdp_st(rrd, interval,
816                                pre_int, post_int,
817                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
818             return -1;
819         }
820         if (update_all_cdp_prep(rrd, rra_step_cnt,
821                                 rra_begin, rrd_file,
822                                 elapsed_pdp_st,
823                                 proc_pdp_cnt,
824                                 &last_seasonal_coef,
825                                 &seasonal_coef,
826                                 pdp_temp, rra_current,
827                                 skip_update, schedule_smooth) == -1) {
828             goto err_free_coefficients;
829         }
830         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
831                                  elapsed_pdp_st, pdp_temp,
832                                  &seasonal_coef) == -1) {
833             goto err_free_coefficients;
834         }
835         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
836                           rra_current, *current_time, skip_update,
837                           pcdp_summary) == -1) {
838             goto err_free_coefficients;
839         }
840     }                   /* endif a pdp_st has occurred */
841     rrd->live_head->last_up = *current_time;
842     rrd->live_head->last_up_usec = *current_time_usec;
843
844     if (version < 3) {
845         *rrd->legacy_last_up = rrd->live_head->last_up;
846     }
847     free(seasonal_coef);
848     free(last_seasonal_coef);
849     return 0;
850
851   err_free_coefficients:
852     free(seasonal_coef);
853     free(last_seasonal_coef);
854     return -1;
855 }
856
857 /*
858  * Parse a DS string (time + colon-separated values), storing the
859  * results in current_time, current_time_usec, and updvals.
860  *
861  * Returns 0 on success, -1 on error.
862  */
863 static int parse_ds(
864     rrd_t *rrd,
865     char **updvals,
866     long *tmpl_idx,
867     char *input,
868     unsigned long tmpl_cnt,
869     time_t *current_time,
870     unsigned long *current_time_usec,
871     int version)
872 {
873     char     *p;
874     unsigned long i;
875     char      timesyntax;
876
877     updvals[0] = input;
878     /* initialize all ds input to unknown except the first one
879        which has always got to be set */
880     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
881         updvals[i] = "U";
882
883     /* separate all ds elements; first must be examined separately
884        due to alternate time syntax */
885     if ((p = strchr(input, '@')) != NULL) {
886         timesyntax = '@';
887     } else if ((p = strchr(input, ':')) != NULL) {
888         timesyntax = ':';
889     } else {
890         rrd_set_error("expected timestamp not found in data source from %s",
891                       input);
892         return -1;
893     }
894     *p = '\0';
895     i = 1;
896     updvals[tmpl_idx[i++]] = p + 1;
897     while (*(++p)) {
898         if (*p == ':') {
899             *p = '\0';
900             if (i < tmpl_cnt) {
901                 updvals[tmpl_idx[i++]] = p + 1;
902             }
903         }
904     }
905
906     if (i != tmpl_cnt) {
907         rrd_set_error("expected %lu data source readings (got %lu) from %s",
908                       tmpl_cnt - 1, i, input);
909         return -1;
910     }
911
912     if (get_time_from_reading(rrd, timesyntax, updvals,
913                               current_time, current_time_usec,
914                               version) == -1) {
915         return -1;
916     }
917     return 0;
918 }
919
920 /*
921  * Parse the time in a DS string, store it in current_time and 
922  * current_time_usec and verify that it's later than the last
923  * update for this DS.
924  *
925  * Returns 0 on success, -1 on error.
926  */
927 static int get_time_from_reading(
928     rrd_t *rrd,
929     char timesyntax,
930     char **updvals,
931     time_t *current_time,
932     unsigned long *current_time_usec,
933     int version)
934 {
935     double    tmp;
936     char     *parsetime_error = NULL;
937     char     *old_locale;
938     rrd_time_value_t ds_tv;
939     struct timeval tmp_time;    /* used for time conversion */
940
941     /* get the time from the reading ... handle N */
942     if (timesyntax == '@') {    /* at-style */
943         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
944             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
945             return -1;
946         }
947         if (ds_tv.type == RELATIVE_TO_END_TIME ||
948             ds_tv.type == RELATIVE_TO_START_TIME) {
949             rrd_set_error("specifying time relative to the 'start' "
950                           "or 'end' makes no sense here: %s", updvals[0]);
951             return -1;
952         }
953         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
954         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
955     } else if (strcmp(updvals[0], "N") == 0) {
956         gettimeofday(&tmp_time, 0);
957         normalize_time(&tmp_time);
958         *current_time = tmp_time.tv_sec;
959         *current_time_usec = tmp_time.tv_usec;
960     } else {
961         old_locale = setlocale(LC_NUMERIC, "C");
962         tmp = strtod(updvals[0], 0);
963         setlocale(LC_NUMERIC, old_locale);
964         *current_time = floor(tmp);
965         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
966     }
967     /* dont do any correction for old version RRDs */
968     if (version < 3)
969         *current_time_usec = 0;
970
971     if (*current_time < rrd->live_head->last_up ||
972         (*current_time == rrd->live_head->last_up &&
973          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
974         rrd_set_error("illegal attempt to update using time %ld when "
975                       "last update time is %ld (minimum one second step)",
976                       *current_time, rrd->live_head->last_up);
977         return -1;
978     }
979     return 0;
980 }
981
982 /*
983  * Update pdp_new by interpreting the updvals according to the DS type
984  * (COUNTER, GAUGE, etc.).
985  *
986  * Returns 0 on success, -1 on error.
987  */
988 static int update_pdp_prep(
989     rrd_t *rrd,
990     char **updvals,
991     rrd_value_t *pdp_new,
992     double interval)
993 {
994     unsigned long ds_idx;
995     int       ii;
996     char     *endptr;   /* used in the conversion */
997     double    rate;
998     char     *old_locale;
999     enum dst_en dst_idx;
1000
1001     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1002         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1003
1004         /* make sure we do not build diffs with old last_ds values */
1005         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1006             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1007             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1008         }
1009
1010         /* NOTE: DST_CDEF should never enter this if block, because
1011          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1012          * accidently specified a value for the DST_CDEF. To handle this case,
1013          * an extra check is required. */
1014
1015         if ((updvals[ds_idx + 1][0] != 'U') &&
1016             (dst_idx != DST_CDEF) &&
1017             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1018             rate = DNAN;
1019
1020             /* pdp_new contains rate * time ... eg the bytes transferred during
1021              * the interval. Doing it this way saves a lot of math operations
1022              */
1023             switch (dst_idx) {
1024             case DST_COUNTER:
1025             case DST_DERIVE:
1026                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1027                     if ((updvals[ds_idx + 1][ii] < '0'
1028                          || updvals[ds_idx + 1][ii] > '9')
1029                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1030                         rrd_set_error("not a simple integer: '%s'",
1031                                       updvals[ds_idx + 1]);
1032                         return -1;
1033                     }
1034                 }
1035                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1036                     pdp_new[ds_idx] =
1037                         rrd_diff(updvals[ds_idx + 1],
1038                                  rrd->pdp_prep[ds_idx].last_ds);
1039                     if (dst_idx == DST_COUNTER) {
1040                         /* simple overflow catcher. This will fail
1041                          * terribly for non 32 or 64 bit counters
1042                          * ... are there any others in SNMP land?
1043                          */
1044                         if (pdp_new[ds_idx] < (double) 0.0)
1045                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1046                         if (pdp_new[ds_idx] < (double) 0.0)
1047                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1048                     }
1049                     rate = pdp_new[ds_idx] / interval;
1050                 } else {
1051                     pdp_new[ds_idx] = DNAN;
1052                 }
1053                 break;
1054             case DST_ABSOLUTE:
1055                 old_locale = setlocale(LC_NUMERIC, "C");
1056                 errno = 0;
1057                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1058                 setlocale(LC_NUMERIC, old_locale);
1059                 if (errno > 0) {
1060                     rrd_set_error("converting '%s' to float: %s",
1061                                   updvals[ds_idx + 1], rrd_strerror(errno));
1062                     return -1;
1063                 };
1064                 if (endptr[0] != '\0') {
1065                     rrd_set_error
1066                         ("conversion of '%s' to float not complete: tail '%s'",
1067                          updvals[ds_idx + 1], endptr);
1068                     return -1;
1069                 }
1070                 rate = pdp_new[ds_idx] / interval;
1071                 break;
1072             case DST_GAUGE:
1073                 errno = 0;
1074                 old_locale = setlocale(LC_NUMERIC, "C");
1075                 pdp_new[ds_idx] =
1076                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1077                 setlocale(LC_NUMERIC, old_locale);
1078                 if (errno) {
1079                     rrd_set_error("converting '%s' to float: %s",
1080                                   updvals[ds_idx + 1], rrd_strerror(errno));
1081                     return -1;
1082                 };
1083                 if (endptr[0] != '\0') {
1084                     rrd_set_error
1085                         ("conversion of '%s' to float not complete: tail '%s'",
1086                          updvals[ds_idx + 1], endptr);
1087                     return -1;
1088                 }
1089                 rate = pdp_new[ds_idx] / interval;
1090                 break;
1091             default:
1092                 rrd_set_error("rrd contains unknown DS type : '%s'",
1093                               rrd->ds_def[ds_idx].dst);
1094                 return -1;
1095             }
1096             /* break out of this for loop if the error string is set */
1097             if (rrd_test_error()) {
1098                 return -1;
1099             }
1100             /* make sure pdp_temp is neither too large or too small
1101              * if any of these occur it becomes unknown ...
1102              * sorry folks ... */
1103             if (!isnan(rate) &&
1104                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1105                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1106                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1107                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1108                 pdp_new[ds_idx] = DNAN;
1109             }
1110         } else {
1111             /* no news is news all the same */
1112             pdp_new[ds_idx] = DNAN;
1113         }
1114
1115
1116         /* make a copy of the command line argument for the next run */
1117 #ifdef DEBUG
1118         fprintf(stderr, "prep ds[%lu]\t"
1119                 "last_arg '%s'\t"
1120                 "this_arg '%s'\t"
1121                 "pdp_new %10.2f\n",
1122                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1123                 pdp_new[ds_idx]);
1124 #endif
1125         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1126                 LAST_DS_LEN - 1);
1127         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1128     }
1129     return 0;
1130 }
1131
1132 /*
1133  * How many PDP steps have elapsed since the last update? Returns the answer,
1134  * and stores the time between the last update and the last PDP in pre_time,
1135  * and the time between the last PDP and the current time in post_int.
1136  */
1137 static int calculate_elapsed_steps(
1138     rrd_t *rrd,
1139     unsigned long current_time,
1140     unsigned long current_time_usec,
1141     double interval,
1142     double *pre_int,
1143     double *post_int,
1144     unsigned long *proc_pdp_cnt)
1145 {
1146     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1147     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1148                                  * time */
1149     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1150                                  * when it was last updated */
1151     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1152
1153     /* when was the current pdp started */
1154     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1155     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1156
1157     /* when did the last pdp_st occur */
1158     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1159     occu_pdp_st = current_time - occu_pdp_age;
1160
1161     if (occu_pdp_st > proc_pdp_st) {
1162         /* OK we passed the pdp_st moment */
1163         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1164                                                                      * occurred before the latest
1165                                                                      * pdp_st moment*/
1166         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1167         *post_int = occu_pdp_age;   /* how much after it */
1168         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1169     } else {
1170         *pre_int = interval;
1171         *post_int = 0;
1172     }
1173
1174     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1175
1176 #ifdef DEBUG
1177     printf("proc_pdp_age %lu\t"
1178            "proc_pdp_st %lu\t"
1179            "occu_pfp_age %lu\t"
1180            "occu_pdp_st %lu\t"
1181            "int %lf\t"
1182            "pre_int %lf\t"
1183            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1184            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1185 #endif
1186
1187     /* compute the number of elapsed pdp_st moments */
1188     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1189 }
1190
1191 /*
1192  * Increment the PDP values by the values in pdp_new, or else initialize them.
1193  */
1194 static void simple_update(
1195     rrd_t *rrd,
1196     double interval,
1197     rrd_value_t *pdp_new)
1198 {
1199     int       i;
1200
1201     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1202         if (isnan(pdp_new[i])) {
1203             /* this is not really accurate if we use subsecond data arrival time
1204                should have thought of it when going subsecond resolution ...
1205                sorry next format change we will have it! */
1206             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1207                 floor(interval);
1208         } else {
1209             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1210                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1211             } else {
1212                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1213             }
1214         }
1215 #ifdef DEBUG
1216         fprintf(stderr,
1217                 "NO PDP  ds[%i]\t"
1218                 "value %10.2f\t"
1219                 "unkn_sec %5lu\n",
1220                 i,
1221                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1222                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1223 #endif
1224     }
1225 }
1226
1227 /*
1228  * Call process_pdp_st for each DS.
1229  *
1230  * Returns 0 on success, -1 on error.
1231  */
1232 static int process_all_pdp_st(
1233     rrd_t *rrd,
1234     double interval,
1235     double pre_int,
1236     double post_int,
1237     unsigned long elapsed_pdp_st,
1238     rrd_value_t *pdp_new,
1239     rrd_value_t *pdp_temp)
1240 {
1241     unsigned long ds_idx;
1242
1243     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1244        rate*seconds which occurred up to the last run.
1245        pdp_new[] contains rate*seconds from the latest run.
1246        pdp_temp[] will contain the rate for cdp */
1247
1248     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1249         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1250                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1251                            pdp_new, pdp_temp) == -1) {
1252             return -1;
1253         }
1254 #ifdef DEBUG
1255         fprintf(stderr, "PDP UPD ds[%lu]\t"
1256                 "elapsed_pdp_st %lu\t"
1257                 "pdp_temp %10.2f\t"
1258                 "new_prep %10.2f\t"
1259                 "new_unkn_sec %5lu\n",
1260                 ds_idx,
1261                 elapsed_pdp_st,
1262                 pdp_temp[ds_idx],
1263                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1264                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1265 #endif
1266     }
1267     return 0;
1268 }
1269
1270 /*
1271  * Process an update that occurs after one of the PDP moments.
1272  * Increments the PDP value, sets NAN if time greater than the
1273  * heartbeats have elapsed, processes CDEFs.
1274  *
1275  * Returns 0 on success, -1 on error.
1276  */
1277 static int process_pdp_st(
1278     rrd_t *rrd,
1279     unsigned long ds_idx,
1280     double interval,
1281     double pre_int,
1282     double post_int,
1283     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1284     rrd_value_t *pdp_new,
1285     rrd_value_t *pdp_temp)
1286 {
1287     int       i;
1288
1289     /* update pdp_prep to the current pdp_st. */
1290     double    pre_unknown = 0.0;
1291     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1292     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1293
1294     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1295
1296     rpnstack_init(&rpnstack);
1297
1298
1299     if (isnan(pdp_new[ds_idx])) {
1300         /* a final bit of unknown to be added before calculation
1301            we use a temporary variable for this so that we
1302            don't have to turn integer lines before using the value */
1303         pre_unknown = pre_int;
1304     } else {
1305         if (isnan(scratch[PDP_val].u_val)) {
1306             scratch[PDP_val].u_val = 0;
1307         }
1308         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1309     }
1310
1311     /* if too much of the pdp_prep is unknown we dump it */
1312     /* if the interval is larger thatn mrhb we get NAN */
1313     if ((interval > mrhb) ||
1314         (rrd->stat_head->pdp_step / 2.0 <
1315          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1316         pdp_temp[ds_idx] = DNAN;
1317     } else {
1318         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1319             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1320              pre_unknown);
1321     }
1322
1323     /* process CDEF data sources; remember each CDEF DS can
1324      * only reference other DS with a lower index number */
1325     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1326         rpnp_t   *rpnp;
1327
1328         rpnp =
1329             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1330         /* substitute data values for OP_VARIABLE nodes */
1331         for (i = 0; rpnp[i].op != OP_END; i++) {
1332             if (rpnp[i].op == OP_VARIABLE) {
1333                 rpnp[i].op = OP_NUMBER;
1334                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1335             }
1336         }
1337         /* run the rpn calculator */
1338         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1339             free(rpnp);
1340             rpnstack_free(&rpnstack);
1341             return -1;
1342         }
1343     }
1344
1345     /* make pdp_prep ready for the next run */
1346     if (isnan(pdp_new[ds_idx])) {
1347         /* this is not realy accurate if we use subsecond data arival time
1348            should have thought of it when going subsecond resolution ...
1349            sorry next format change we will have it! */
1350         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1351         scratch[PDP_val].u_val = DNAN;
1352     } else {
1353         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1354         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1355     }
1356     rpnstack_free(&rpnstack);
1357     return 0;
1358 }
1359
1360 /*
1361  * Iterate over all the RRAs for a given DS and:
1362  * 1. Decide whether to schedule a smooth later
1363  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1364  * 3. Update the CDP
1365  *
1366  * Returns 0 on success, -1 on error
1367  */
1368 static int update_all_cdp_prep(
1369     rrd_t *rrd,
1370     unsigned long *rra_step_cnt,
1371     unsigned long rra_begin,
1372     rrd_file_t *rrd_file,
1373     unsigned long elapsed_pdp_st,
1374     unsigned long proc_pdp_cnt,
1375     rrd_value_t **last_seasonal_coef,
1376     rrd_value_t **seasonal_coef,
1377     rrd_value_t *pdp_temp,
1378     unsigned long *rra_current,
1379     unsigned long *skip_update,
1380     int *schedule_smooth)
1381 {
1382     unsigned long rra_idx;
1383
1384     /* index into the CDP scratch array */
1385     enum cf_en current_cf;
1386     unsigned long rra_start;
1387
1388     /* number of rows to be updated in an RRA for a data value. */
1389     unsigned long start_pdp_offset;
1390
1391     rra_start = rra_begin;
1392     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1393         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1394         start_pdp_offset =
1395             rrd->rra_def[rra_idx].pdp_cnt -
1396             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1397         skip_update[rra_idx] = 0;
1398         if (start_pdp_offset <= elapsed_pdp_st) {
1399             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1400                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1401         } else {
1402             rra_step_cnt[rra_idx] = 0;
1403         }
1404
1405         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1406             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1407              * so that they will be correct for the next observed value; note that for
1408              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1409              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1410             if (rra_step_cnt[rra_idx] > 1) {
1411                 skip_update[rra_idx] = 1;
1412                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1413                                 elapsed_pdp_st, last_seasonal_coef);
1414                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1415                                 elapsed_pdp_st + 1, seasonal_coef);
1416             }
1417             /* periodically run a smoother for seasonal effects */
1418             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1419 #ifdef DEBUG
1420                 fprintf(stderr,
1421                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1422                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1423                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1424                         u_cnt);
1425 #endif
1426                 *schedule_smooth = 1;
1427             }
1428             *rra_current = rrd_tell(rrd_file);
1429         }
1430         if (rrd_test_error())
1431             return -1;
1432
1433         if (update_cdp_prep
1434             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1435              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1436              current_cf) == -1) {
1437             return -1;
1438         }
1439         rra_start +=
1440             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1441             sizeof(rrd_value_t);
1442     }
1443     return 0;
1444 }
1445
1446 /* 
1447  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1448  */
1449 static int do_schedule_smooth(
1450     rrd_t *rrd,
1451     unsigned long rra_idx,
1452     unsigned long elapsed_pdp_st)
1453 {
1454     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1455     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1456     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1457     unsigned long seasonal_smooth_idx =
1458         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1459     unsigned long *init_seasonal =
1460         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1461
1462     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1463      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1464      * really an RRA level, not a data source within RRA level parameter, but
1465      * the rra_def is read only for rrd_update (not flushed to disk). */
1466     if (*init_seasonal > BURNIN_CYCLES) {
1467         /* someone has no doubt invented a trick to deal with this wrap around,
1468          * but at least this code is clear. */
1469         if (seasonal_smooth_idx > cur_row) {
1470             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1471              * between PDP and CDP */
1472             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1473         }
1474         /* can't rely on negative numbers because we are working with
1475          * unsigned values */
1476         return (cur_row + elapsed_pdp_st >= row_cnt
1477                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1478     }
1479     /* mark off one of the burn-in cycles */
1480     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1481 }
1482
1483 /*
1484  * For a given RRA, iterate over the data sources and call the appropriate
1485  * consolidation function.
1486  *
1487  * Returns 0 on success, -1 on error.
1488  */
1489 static int update_cdp_prep(
1490     rrd_t *rrd,
1491     unsigned long elapsed_pdp_st,
1492     unsigned long start_pdp_offset,
1493     unsigned long *rra_step_cnt,
1494     int rra_idx,
1495     rrd_value_t *pdp_temp,
1496     rrd_value_t *last_seasonal_coef,
1497     rrd_value_t *seasonal_coef,
1498     int current_cf)
1499 {
1500     unsigned long ds_idx, cdp_idx;
1501
1502     /* update CDP_PREP areas */
1503     /* loop over data soures within each RRA */
1504     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1505
1506         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1507
1508         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1509             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1510                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1511                        elapsed_pdp_st, start_pdp_offset,
1512                        rrd->rra_def[rra_idx].pdp_cnt,
1513                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1514                        rra_idx, ds_idx);
1515         } else {
1516             /* Nothing to consolidate if there's one PDP per CDP. However, if
1517              * we've missed some PDPs, let's update null counters etc. */
1518             if (elapsed_pdp_st > 2) {
1519                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1520                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1521                           current_cf);
1522             }
1523         }
1524
1525         if (rrd_test_error())
1526             return -1;
1527     }                   /* endif data sources loop */
1528     return 0;
1529 }
1530
1531 /*
1532  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1533  * primary value, secondary value, and # of unknowns.
1534  */
1535 static void update_cdp(
1536     unival *scratch,
1537     int current_cf,
1538     rrd_value_t pdp_temp_val,
1539     unsigned long rra_step_cnt,
1540     unsigned long elapsed_pdp_st,
1541     unsigned long start_pdp_offset,
1542     unsigned long pdp_cnt,
1543     rrd_value_t xff,
1544     int i,
1545     int ii)
1546 {
1547     /* shorthand variables */
1548     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1549     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1550     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1551     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1552
1553     if (rra_step_cnt) {
1554         /* If we are in this block, as least 1 CDP value will be written to
1555          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1556          * to be written, then the "fill in" value is the CDP_secondary_val
1557          * entry. */
1558         if (isnan(pdp_temp_val)) {
1559             *cdp_unkn_pdp_cnt += start_pdp_offset;
1560             *cdp_secondary_val = DNAN;
1561         } else {
1562             /* CDP_secondary value is the RRA "fill in" value for intermediary
1563              * CDP data entries. No matter the CF, the value is the same because
1564              * the average, max, min, and last of a list of identical values is
1565              * the same, namely, the value itself. */
1566             *cdp_secondary_val = pdp_temp_val;
1567         }
1568
1569         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1570             *cdp_primary_val = DNAN;
1571             if (current_cf == CF_AVERAGE) {
1572                 *cdp_val =
1573                     initialize_average_carry_over(pdp_temp_val,
1574                                                   elapsed_pdp_st,
1575                                                   start_pdp_offset, pdp_cnt);
1576             } else {
1577                 *cdp_val = pdp_temp_val;
1578             }
1579         } else {
1580             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1581                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1582         }               /* endif meets xff value requirement for a valid value */
1583         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1584          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1585         if (isnan(pdp_temp_val))
1586             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1587         else
1588             *cdp_unkn_pdp_cnt = 0;
1589     } else {            /* rra_step_cnt[i]  == 0 */
1590
1591 #ifdef DEBUG
1592         if (isnan(*cdp_val)) {
1593             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1594                     i, ii);
1595         } else {
1596             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1597                     i, ii, *cdp_val);
1598         }
1599 #endif
1600         if (isnan(pdp_temp_val)) {
1601             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1602         } else {
1603             *cdp_val =
1604                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1605                                   current_cf, i, ii);
1606         }
1607     }
1608 }
1609
1610 /*
1611  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1612  * on the type of consolidation function.
1613  */
1614 static void initialize_cdp_val(
1615     unival *scratch,
1616     int current_cf,
1617     rrd_value_t pdp_temp_val,
1618     unsigned long elapsed_pdp_st,
1619     unsigned long start_pdp_offset,
1620     unsigned long pdp_cnt)
1621 {
1622     rrd_value_t cum_val, cur_val;
1623
1624     switch (current_cf) {
1625     case CF_AVERAGE:
1626         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1627         cur_val = IFDNAN(pdp_temp_val, 0.0);
1628         scratch[CDP_primary_val].u_val =
1629             (cum_val + cur_val * start_pdp_offset) /
1630             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1631         scratch[CDP_val].u_val =
1632             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1633                                           start_pdp_offset, pdp_cnt);
1634         break;
1635     case CF_MAXIMUM:
1636         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1637         cur_val = IFDNAN(pdp_temp_val, -DINF);
1638 #if 0
1639 #ifdef DEBUG
1640         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1641             fprintf(stderr,
1642                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1643                     i, ii);
1644             exit(-1);
1645         }
1646 #endif
1647 #endif
1648         if (cur_val > cum_val)
1649             scratch[CDP_primary_val].u_val = cur_val;
1650         else
1651             scratch[CDP_primary_val].u_val = cum_val;
1652         /* initialize carry over value */
1653         scratch[CDP_val].u_val = pdp_temp_val;
1654         break;
1655     case CF_MINIMUM:
1656         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1657         cur_val = IFDNAN(pdp_temp_val, DINF);
1658 #if 0
1659 #ifdef DEBUG
1660         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1661             fprintf(stderr,
1662                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1663                     ii);
1664             exit(-1);
1665         }
1666 #endif
1667 #endif
1668         if (cur_val < cum_val)
1669             scratch[CDP_primary_val].u_val = cur_val;
1670         else
1671             scratch[CDP_primary_val].u_val = cum_val;
1672         /* initialize carry over value */
1673         scratch[CDP_val].u_val = pdp_temp_val;
1674         break;
1675     case CF_LAST:
1676     default:
1677         scratch[CDP_primary_val].u_val = pdp_temp_val;
1678         /* initialize carry over value */
1679         scratch[CDP_val].u_val = pdp_temp_val;
1680         break;
1681     }
1682 }
1683
1684 /*
1685  * Update the consolidation function for Holt-Winters functions as
1686  * well as other functions that don't actually consolidate multiple
1687  * PDPs.
1688  */
1689 static void reset_cdp(
1690     rrd_t *rrd,
1691     unsigned long elapsed_pdp_st,
1692     rrd_value_t *pdp_temp,
1693     rrd_value_t *last_seasonal_coef,
1694     rrd_value_t *seasonal_coef,
1695     int rra_idx,
1696     int ds_idx,
1697     int cdp_idx,
1698     enum cf_en current_cf)
1699 {
1700     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1701
1702     switch (current_cf) {
1703     case CF_AVERAGE:
1704     default:
1705         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1706         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1707         break;
1708     case CF_SEASONAL:
1709     case CF_DEVSEASONAL:
1710         /* need to update cached seasonal values, so they are consistent
1711          * with the bulk update */
1712         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1713          * CDP_last_deviation are the same. */
1714         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1715         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1716         break;
1717     case CF_HWPREDICT:
1718     case CF_MHWPREDICT:
1719         /* need to update the null_count and last_null_count.
1720          * even do this for non-DNAN pdp_temp because the
1721          * algorithm is not learning from batch updates. */
1722         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1723         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1724         /* fall through */
1725     case CF_DEVPREDICT:
1726         scratch[CDP_primary_val].u_val = DNAN;
1727         scratch[CDP_secondary_val].u_val = DNAN;
1728         break;
1729     case CF_FAILURES:
1730         /* do not count missed bulk values as failures */
1731         scratch[CDP_primary_val].u_val = 0;
1732         scratch[CDP_secondary_val].u_val = 0;
1733         /* need to reset violations buffer.
1734          * could do this more carefully, but for now, just
1735          * assume a bulk update wipes away all violations. */
1736         erase_violations(rrd, cdp_idx, rra_idx);
1737         break;
1738     }
1739 }
1740
1741 static rrd_value_t initialize_average_carry_over(
1742     rrd_value_t pdp_temp_val,
1743     unsigned long elapsed_pdp_st,
1744     unsigned long start_pdp_offset,
1745     unsigned long pdp_cnt)
1746 {
1747     /* initialize carry over value */
1748     if (isnan(pdp_temp_val)) {
1749         return DNAN;
1750     }
1751     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1752 }
1753
1754 /*
1755  * Update or initialize a CDP value based on the consolidation
1756  * function.
1757  *
1758  * Returns the new value.
1759  */
1760 static rrd_value_t calculate_cdp_val(
1761     rrd_value_t cdp_val,
1762     rrd_value_t pdp_temp_val,
1763     unsigned long elapsed_pdp_st,
1764     int current_cf,
1765 #ifdef DEBUG
1766     int i,
1767     int ii
1768 #else
1769     int UNUSED(i),
1770     int UNUSED(ii)
1771 #endif
1772     )
1773 {
1774     if (isnan(cdp_val)) {
1775         if (current_cf == CF_AVERAGE) {
1776             pdp_temp_val *= elapsed_pdp_st;
1777         }
1778 #ifdef DEBUG
1779         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1780                 i, ii, pdp_temp_val);
1781 #endif
1782         return pdp_temp_val;
1783     }
1784     if (current_cf == CF_AVERAGE)
1785         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1786     if (current_cf == CF_MINIMUM)
1787         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1788     if (current_cf == CF_MAXIMUM)
1789         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1790
1791     return pdp_temp_val;
1792 }
1793
1794 /*
1795  * For each RRA, update the seasonal values and then call update_aberrant_CF
1796  * for each data source.
1797  *
1798  * Return 0 on success, -1 on error.
1799  */
1800 static int update_aberrant_cdps(
1801     rrd_t *rrd,
1802     rrd_file_t *rrd_file,
1803     unsigned long rra_begin,
1804     unsigned long *rra_current,
1805     unsigned long elapsed_pdp_st,
1806     rrd_value_t *pdp_temp,
1807     rrd_value_t **seasonal_coef)
1808 {
1809     unsigned long rra_idx, ds_idx, j;
1810
1811     /* number of PDP steps since the last update that
1812      * are assigned to the first CDP to be generated
1813      * since the last update. */
1814     unsigned short scratch_idx;
1815     unsigned long rra_start;
1816     enum cf_en current_cf;
1817
1818     /* this loop is only entered if elapsed_pdp_st < 3 */
1819     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1820          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1821         rra_start = rra_begin;
1822         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1823             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1824                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1825                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1826                     if (scratch_idx == CDP_primary_val) {
1827                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1828                                         elapsed_pdp_st + 1, seasonal_coef);
1829                     } else {
1830                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1831                                         elapsed_pdp_st + 2, seasonal_coef);
1832                     }
1833                     *rra_current = rrd_tell(rrd_file);
1834                 }
1835                 if (rrd_test_error())
1836                     return -1;
1837                 /* loop over data soures within each RRA */
1838                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1839                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1840                                        rra_idx * (rrd->stat_head->ds_cnt) +
1841                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1842                                        *seasonal_coef);
1843                 }
1844             }
1845             rra_start += rrd->rra_def[rra_idx].row_cnt
1846                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1847         }
1848     }
1849     return 0;
1850 }
1851
1852 /* 
1853  * Move sequentially through the file, writing one RRA at a time.  Note this
1854  * architecture divorces the computation of CDP with flushing updated RRA
1855  * entries to disk.
1856  *
1857  * Return 0 on success, -1 on error.
1858  */
1859 static int write_to_rras(
1860     rrd_t *rrd,
1861     rrd_file_t *rrd_file,
1862     unsigned long *rra_step_cnt,
1863     unsigned long rra_begin,
1864     unsigned long *rra_current,
1865     time_t current_time,
1866     unsigned long *skip_update,
1867     rrd_info_t ** pcdp_summary)
1868 {
1869     unsigned long rra_idx;
1870     unsigned long rra_start;
1871     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1872     time_t    rra_time = 0; /* time of update for a RRA */
1873
1874     /* Ready to write to disk */
1875     rra_start = rra_begin;
1876     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1877         /* skip unless there's something to write */
1878         if (rra_step_cnt[rra_idx]) {
1879             /* write the first row */
1880 #ifdef DEBUG
1881             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1882 #endif
1883             rrd->rra_ptr[rra_idx].cur_row++;
1884             if (rrd->rra_ptr[rra_idx].cur_row >=
1885                 rrd->rra_def[rra_idx].row_cnt)
1886                 rrd->rra_ptr[rra_idx].cur_row = 0;  /* wrap around */
1887             /* position on the first row */
1888             rra_pos_tmp = rra_start +
1889                 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1890                 sizeof(rrd_value_t);
1891             if (rra_pos_tmp != *rra_current) {
1892                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1893                     rrd_set_error("seek error in rrd");
1894                     return -1;
1895                 }
1896                 *rra_current = rra_pos_tmp;
1897             }
1898 #ifdef DEBUG
1899             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1900 #endif
1901             if (!skip_update[rra_idx]) {
1902                 if (*pcdp_summary != NULL) {
1903                     rra_time = (current_time - current_time
1904                                 % (rrd->rra_def[rra_idx].pdp_cnt *
1905                                    rrd->stat_head->pdp_step))
1906                         -
1907                         ((rra_step_cnt[rra_idx] -
1908                           1) * rrd->rra_def[rra_idx].pdp_cnt *
1909                          rrd->stat_head->pdp_step);
1910                 }
1911                 if (write_RRA_row
1912                     (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1913                      pcdp_summary, rra_time) == -1)
1914                     return -1;
1915             }
1916
1917             /* write other rows of the bulk update, if any */
1918             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1919                 if (++rrd->rra_ptr[rra_idx].cur_row ==
1920                     rrd->rra_def[rra_idx].row_cnt) {
1921 #ifdef DEBUG
1922                     fprintf(stderr,
1923                             "Wraparound for RRA %s, %lu updates left\n",
1924                             rrd->rra_def[rra_idx].cf_nam,
1925                             rra_step_cnt[rra_idx] - 1);
1926 #endif
1927                     /* wrap */
1928                     rrd->rra_ptr[rra_idx].cur_row = 0;
1929                     /* seek back to beginning of current rra */
1930                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1931                         rrd_set_error("seek error in rrd");
1932                         return -1;
1933                     }
1934 #ifdef DEBUG
1935                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1936                             rrd_file->pos);
1937 #endif
1938                     *rra_current = rra_start;
1939                 }
1940                 if (!skip_update[rra_idx]) {
1941                     if (*pcdp_summary != NULL) {
1942                         rra_time = (current_time - current_time
1943                                     % (rrd->rra_def[rra_idx].pdp_cnt *
1944                                        rrd->stat_head->pdp_step))
1945                             -
1946                             ((rra_step_cnt[rra_idx] -
1947                               2) * rrd->rra_def[rra_idx].pdp_cnt *
1948                              rrd->stat_head->pdp_step);
1949                     }
1950                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1951                                       CDP_secondary_val, pcdp_summary,
1952                                       rra_time) == -1)
1953                         return -1;
1954                 }
1955             }
1956         }
1957         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1958             sizeof(rrd_value_t);
1959     }                   /* RRA LOOP */
1960
1961     return 0;
1962 }
1963
1964 /*
1965  * Write out one row of values (one value per DS) to the archive.
1966  *
1967  * Returns 0 on success, -1 on error.
1968  */
1969 static int write_RRA_row(
1970     rrd_file_t *rrd_file,
1971     rrd_t *rrd,
1972     unsigned long rra_idx,
1973     unsigned long *rra_current,
1974     unsigned short CDP_scratch_idx,
1975     rrd_info_t ** pcdp_summary,
1976     time_t rra_time)
1977 {
1978     unsigned long ds_idx, cdp_idx;
1979     rrd_infoval_t iv;
1980
1981     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1982         /* compute the cdp index */
1983         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1984 #ifdef DEBUG
1985         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1986                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1987                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1988 #endif
1989         if (*pcdp_summary != NULL) {
1990             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1991             /* append info to the return hash */
1992             *pcdp_summary = rrd_info_push(*pcdp_summary,
1993                                           sprintf_alloc
1994                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1995                                            rrd->rra_def[rra_idx].cf_nam,
1996                                            rrd->rra_def[rra_idx].pdp_cnt,
1997                                            rrd->ds_def[ds_idx].ds_nam),
1998                                           RD_I_VAL, iv);
1999         }
2000         if (rrd_write(rrd_file,
2001                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2002                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2003             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2004             return -1;
2005         }
2006         *rra_current += sizeof(rrd_value_t);
2007     }
2008     return 0;
2009 }
2010
2011 /*
2012  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2013  *
2014  * Returns 0 on success, -1 otherwise
2015  */
2016 static int smooth_all_rras(
2017     rrd_t *rrd,
2018     rrd_file_t *rrd_file,
2019     unsigned long rra_begin)
2020 {
2021     unsigned long rra_start = rra_begin;
2022     unsigned long rra_idx;
2023
2024     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2025         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2026             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2027 #ifdef DEBUG
2028             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2029 #endif
2030             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2031             if (rrd_test_error())
2032                 return -1;
2033         }
2034         rra_start += rrd->rra_def[rra_idx].row_cnt
2035             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2036     }
2037     return 0;
2038 }
2039
2040 #ifndef HAVE_MMAP
2041 /*
2042  * Flush changes to disk (unless we're using mmap)
2043  *
2044  * Returns 0 on success, -1 otherwise
2045  */
2046 static int write_changes_to_disk(
2047     rrd_t *rrd,
2048     rrd_file_t *rrd_file,
2049     int version)
2050 {
2051     /* we just need to write back the live header portion now */
2052     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2053                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2054                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2055                  SEEK_SET) != 0) {
2056         rrd_set_error("seek rrd for live header writeback");
2057         return -1;
2058     }
2059     if (version >= 3) {
2060         if (rrd_write(rrd_file, rrd->live_head,
2061                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2062             rrd_set_error("rrd_write live_head to rrd");
2063             return -1;
2064         }
2065     } else {
2066         if (rrd_write(rrd_file, rrd->legacy_last_up,
2067                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2068             rrd_set_error("rrd_write live_head to rrd");
2069             return -1;
2070         }
2071     }
2072
2073
2074     if (rrd_write(rrd_file, rrd->pdp_prep,
2075                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2076         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2077         rrd_set_error("rrd_write pdp_prep to rrd");
2078         return -1;
2079     }
2080
2081     if (rrd_write(rrd_file, rrd->cdp_prep,
2082                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2083                   rrd->stat_head->ds_cnt)
2084         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2085                       rrd->stat_head->ds_cnt)) {
2086
2087         rrd_set_error("rrd_write cdp_prep to rrd");
2088         return -1;
2089     }
2090
2091     if (rrd_write(rrd_file, rrd->rra_ptr,
2092                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2093         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2094         rrd_set_error("rrd_write rra_ptr to rrd");
2095         return -1;
2096     }
2097     return 0;
2098 }
2099 #endif