158b4ae70e28d064c2701ed991bb16b8fe0e6c94
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.2.99907080300  Copyright by Tobi Oetiker, 1997-2007
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
9
10 #include "rrd_tool.h"
11
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
17
18 #include <locale.h>
19
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
22
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
25
26 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 /*
28  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
29  * replacement.
30  */
31 #include <sys/timeb.h>
32
33 #ifndef __MINGW32__
34 struct timeval {
35     time_t    tv_sec;   /* seconds */
36     long      tv_usec;  /* microseconds */
37 };
38 #endif
39
40 struct __timezone {
41     int       tz_minuteswest;   /* minutes W of Greenwich */
42     int       tz_dsttime;   /* type of dst correction */
43 };
44
45 static int gettimeofday(
46     struct timeval *t,
47     struct __timezone *tz)
48 {
49
50     struct _timeb current_time;
51
52     _ftime(&current_time);
53
54     t->tv_sec = current_time.time;
55     t->tv_usec = current_time.millitm * 1000;
56
57     return 0;
58 }
59
60 #endif
61
62 /* FUNCTION PROTOTYPES */
63
64 int       rrd_update_r(
65     const char *filename,
66     const char *tmplt,
67     int argc,
68     const char **argv);
69 int       _rrd_update(
70     const char *filename,
71     const char *tmplt,
72     int argc,
73     const char **argv,
74     info_t *);
75
76 static int allocate_data_structures(
77     rrd_t *rrd,
78     char ***updvals,
79     rrd_value_t **pdp_temp,
80     const char *tmplt,
81     long **tmpl_idx,
82     unsigned long *tmpl_cnt,
83     unsigned long **rra_step_cnt,
84     unsigned long **skip_update,
85     rrd_value_t **pdp_new);
86
87 static int parse_template(
88     rrd_t *rrd,
89     const char *tmplt,
90     unsigned long *tmpl_cnt,
91     long *tmpl_idx);
92
93 static int process_arg(
94     char *step_start,
95     rrd_t *rrd,
96     rrd_file_t *rrd_file,
97     unsigned long rra_begin,
98     unsigned long *rra_current,
99     time_t *current_time,
100     unsigned long *current_time_usec,
101     rrd_value_t *pdp_temp,
102     rrd_value_t *pdp_new,
103     unsigned long *rra_step_cnt,
104     char **updvals,
105     long *tmpl_idx,
106     unsigned long tmpl_cnt,
107     info_t **pcdp_summary,
108     int version,
109     unsigned long *skip_update,
110     int *schedule_smooth);
111
112 static int parse_ds(
113     rrd_t *rrd,
114     char **updvals,
115     long *tmpl_idx,
116     char *input,
117     unsigned long tmpl_cnt,
118     time_t *current_time,
119     unsigned long *current_time_usec,
120     int version);
121
122 static int get_time_from_reading(
123     rrd_t *rrd,
124     char timesyntax,
125     char **updvals,
126     time_t *current_time,
127     unsigned long *current_time_usec,
128     int version);
129
130 static int update_pdp_prep(
131     rrd_t *rrd,
132     char **updvals,
133     rrd_value_t *pdp_new,
134     double interval);
135
136 static int calculate_elapsed_steps(
137     rrd_t *rrd,
138     unsigned long current_time,
139     unsigned long current_time_usec,
140     double interval,
141     double *pre_int,
142     double *post_int,
143     unsigned long *proc_pdp_cnt);
144
145 static void simple_update(
146     rrd_t *rrd,
147     double interval,
148     rrd_value_t *pdp_new);
149
150 static int process_all_pdp_st(
151     rrd_t *rrd,
152     double interval,
153     double pre_int,
154     double post_int,
155     unsigned long elapsed_pdp_st,
156     rrd_value_t *pdp_new,
157     rrd_value_t *pdp_temp);
158
159 static int process_pdp_st(
160     rrd_t *rrd,
161     unsigned long ds_idx,
162     double interval,
163     double pre_int,
164     double post_int,
165     long diff_pdp_st,
166     rrd_value_t *pdp_new,
167     rrd_value_t *pdp_temp);
168
169 static int update_all_cdp_prep(
170     rrd_t *rrd,
171     unsigned long *rra_step_cnt,
172     unsigned long rra_begin,
173     rrd_file_t *rrd_file,
174     unsigned long elapsed_pdp_st,
175     unsigned long proc_pdp_cnt,
176     rrd_value_t **last_seasonal_coef,
177     rrd_value_t **seasonal_coef,
178     rrd_value_t *pdp_temp,
179     unsigned long *rra_current,
180     unsigned long *skip_update,
181     int *schedule_smooth);
182
183 static int do_schedule_smooth(
184     rrd_t *rrd,
185     unsigned long rra_idx,
186     unsigned long elapsed_pdp_st);
187
188 static int update_cdp_prep(
189     rrd_t *rrd,
190     unsigned long elapsed_pdp_st,
191     unsigned long start_pdp_offset,
192     unsigned long *rra_step_cnt,
193     int rra_idx,
194     rrd_value_t *pdp_temp,
195     rrd_value_t *last_seasonal_coef,
196     rrd_value_t *seasonal_coef,
197     int current_cf);
198
199 static void update_cdp(
200     unival *scratch,
201     int current_cf,
202     rrd_value_t pdp_temp_val,
203     unsigned long rra_step_cnt,
204     unsigned long elapsed_pdp_st,
205     unsigned long start_pdp_offset,
206     unsigned long pdp_cnt,
207     rrd_value_t xff,
208     int i,
209     int ii);
210
211 static void initialize_cdp_val(
212     unival *scratch,
213     int current_cf,
214     rrd_value_t pdp_temp_val,
215     unsigned long elapsed_pdp_st,
216     unsigned long start_pdp_offset,
217     unsigned long pdp_cnt);
218
219 static void reset_cdp(
220     rrd_t *rrd,
221     unsigned long elapsed_pdp_st,
222     rrd_value_t *pdp_temp,
223     rrd_value_t *last_seasonal_coef,
224     rrd_value_t *seasonal_coef,
225     int rra_idx,
226     int ds_idx,
227     int cdp_idx,
228     enum cf_en current_cf);
229
230 static rrd_value_t initialize_average_carry_over(
231     rrd_value_t pdp_temp_val,
232     unsigned long elapsed_pdp_st,
233     unsigned long start_pdp_offset,
234     unsigned long pdp_cnt);
235
236 static rrd_value_t calculate_cdp_val(
237     rrd_value_t cdp_val,
238     rrd_value_t pdp_temp_val,
239     unsigned long elapsed_pdp_st,
240     int current_cf,
241     int i,
242     int ii);
243
244 static int update_aberrant_cdps(
245     rrd_t *rrd,
246     rrd_file_t *rrd_file,
247     unsigned long rra_begin,
248     unsigned long *rra_current,
249     unsigned long elapsed_pdp_st,
250     rrd_value_t *pdp_temp,
251     rrd_value_t **seasonal_coef);
252
253 static int write_to_rras(
254     rrd_t *rrd,
255     rrd_file_t *rrd_file,
256     unsigned long *rra_step_cnt,
257     unsigned long rra_begin,
258     unsigned long *rra_current,
259     time_t current_time,
260     unsigned long *skip_update,
261     info_t **pcdp_summary);
262
263 static int write_RRA_row(
264     rrd_file_t *rrd_file,
265     rrd_t *rrd,
266     unsigned long rra_idx,
267     unsigned long *rra_current,
268     unsigned short CDP_scratch_idx,
269     info_t **pcdp_summary,
270     time_t rra_time);
271
272 static int smooth_all_rras(
273     rrd_t *rrd,
274     rrd_file_t *rrd_file,
275     unsigned long rra_begin);
276
277 #ifndef HAVE_MMAP
278 static int write_changes_to_disk(
279     rrd_t *rrd,
280     rrd_file_t *rrd_file,
281     int version);
282 #endif
283
284 /*
285  * normalize time as returned by gettimeofday. usec part must
286  * be always >= 0
287  */
288 static inline void normalize_time(
289     struct timeval *t)
290 {
291     if (t->tv_usec < 0) {
292         t->tv_sec--;
293         t->tv_usec += 1e6L;
294     }
295 }
296
297 /*
298  * Sets current_time and current_time_usec based on the current time.
299  * current_time_usec is set to 0 if the version number is 1 or 2.
300  */
301 static inline void initialize_time(
302     time_t *current_time,
303     unsigned long *current_time_usec,
304     int version)
305 {
306     struct timeval tmp_time;    /* used for time conversion */
307
308     gettimeofday(&tmp_time, 0);
309     normalize_time(&tmp_time);
310     *current_time = tmp_time.tv_sec;
311     if (version >= 3) {
312         *current_time_usec = tmp_time.tv_usec;
313     } else {
314         *current_time_usec = 0;
315     }
316 }
317
318 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
319
320 info_t   *rrd_update_v(
321     int argc,
322     char **argv)
323 {
324     char     *tmplt = NULL;
325     info_t   *result = NULL;
326     infoval   rc;
327     struct option long_options[] = {
328         {"template", required_argument, 0, 't'},
329         {0, 0, 0, 0}
330     };
331
332     rc.u_int = -1;
333     optind = 0;
334     opterr = 0;         /* initialize getopt */
335
336     while (1) {
337         int       option_index = 0;
338         int       opt;
339
340         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
341
342         if (opt == EOF)
343             break;
344
345         switch (opt) {
346         case 't':
347             tmplt = optarg;
348             break;
349
350         case '?':
351             rrd_set_error("unknown option '%s'", argv[optind - 1]);
352             goto end_tag;
353         }
354     }
355
356     /* need at least 2 arguments: filename, data. */
357     if (argc - optind < 2) {
358         rrd_set_error("Not enough arguments");
359         goto end_tag;
360     }
361     rc.u_int = 0;
362     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
363     rc.u_int = _rrd_update(argv[optind], tmplt,
364                            argc - optind - 1,
365                            (const char **) (argv + optind + 1), result);
366     result->value.u_int = rc.u_int;
367   end_tag:
368     return result;
369 }
370
371 int rrd_update(
372     int argc,
373     char **argv)
374 {
375     struct option long_options[] = {
376         {"template", required_argument, 0, 't'},
377         {0, 0, 0, 0}
378     };
379     int       option_index = 0;
380     int       opt;
381     char     *tmplt = NULL;
382     int       rc = -1;
383
384     optind = 0;
385     opterr = 0;         /* initialize getopt */
386
387     while (1) {
388         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
389
390         if (opt == EOF)
391             break;
392
393         switch (opt) {
394         case 't':
395             tmplt = strdup(optarg);
396             break;
397
398         case '?':
399             rrd_set_error("unknown option '%s'", argv[optind - 1]);
400             goto out;
401         }
402     }
403
404     /* need at least 2 arguments: filename, data. */
405     if (argc - optind < 2) {
406         rrd_set_error("Not enough arguments");
407         goto out;
408     }
409
410     rc = rrd_update_r(argv[optind], tmplt,
411                       argc - optind - 1, (const char **) (argv + optind + 1));
412   out:
413     free(tmplt);
414     return rc;
415 }
416
417 int rrd_update_r(
418     const char *filename,
419     const char *tmplt,
420     int argc,
421     const char **argv)
422 {
423     return _rrd_update(filename, tmplt, argc, argv, NULL);
424 }
425
426 int _rrd_update(
427     const char *filename,
428     const char *tmplt,
429     int argc,
430     const char **argv,
431     info_t *pcdp_summary)
432 {
433
434     int       arg_i = 2;
435
436     unsigned long rra_begin;    /* byte pointer to the rra
437                                  * area in the rrd file.  this
438                                  * pointer never changes value */
439     unsigned long rra_current;  /* byte pointer to the current write
440                                  * spot in the rrd file. */
441     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
442                              * to the existing entry */
443     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
444                              * to the cdp values */
445
446     long     *tmpl_idx; /* index representing the settings
447                          * transported by the tmplt index */
448     unsigned long tmpl_cnt = 2; /* time and data */
449     rrd_t     rrd;
450     time_t    current_time = 0;
451     unsigned long current_time_usec = 0;    /* microseconds part of current time */
452     char    **updvals;
453     int       schedule_smooth = 0;
454
455     /* number of elapsed PDP steps since last update */
456     unsigned long *rra_step_cnt = NULL;
457
458     int       version;  /* rrd version */
459     rrd_file_t *rrd_file;
460     char     *arg_copy; /* for processing the argv */
461     unsigned long *skip_update; /* RRAs to advance but not write */
462
463     /* need at least 1 arguments: data. */
464     if (argc < 1) {
465         rrd_set_error("Not enough arguments");
466         goto err_out;
467     }
468
469     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
470         goto err_free;
471     }
472     /* We are now at the beginning of the rra's */
473     rra_current = rra_begin = rrd_file->header_len;
474
475     version = atoi(rrd.stat_head->version);
476
477     initialize_time(&current_time, &current_time_usec, version);
478
479     /* get exclusive lock to whole file.
480      * lock gets removed when we close the file.
481      */
482     if (LockRRD(rrd_file->fd) != 0) {
483         rrd_set_error("could not lock RRD");
484         goto err_close;
485     }
486
487     if (allocate_data_structures(&rrd, &updvals,
488                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
489                                  &rra_step_cnt, &skip_update,
490                                  &pdp_new) == -1) {
491         goto err_close;
492     }
493
494     /* loop through the arguments. */
495     for (arg_i = 0; arg_i < argc; arg_i++) {
496         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
497             rrd_set_error("failed duplication argv entry");
498             break;
499         }
500         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
501                         &current_time, &current_time_usec, pdp_temp, pdp_new,
502                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
503                         &pcdp_summary, version, skip_update,
504                         &schedule_smooth) == -1) {
505             free(arg_copy);
506             break;
507         }
508         free(arg_copy);
509     }
510
511     free(rra_step_cnt);
512
513     /* if we got here and if there is an error and if the file has not been
514      * written to, then close things up and return. */
515     if (rrd_test_error()) {
516         goto err_free_structures;
517     }
518 #ifndef HAVE_MMAP
519     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
520         goto err_free_structures;
521     }
522 #endif
523
524     /* calling the smoothing code here guarantees at most one smoothing
525      * operation per rrd_update call. Unfortunately, it is possible with bulk
526      * updates, or a long-delayed update for smoothing to occur off-schedule.
527      * This really isn't critical except during the burn-in cycles. */
528     if (schedule_smooth) {
529         smooth_all_rras(&rrd, rrd_file, rra_begin);
530     }
531
532 /*    rrd_dontneed(rrd_file,&rrd); */
533     rrd_free(&rrd);
534     rrd_close(rrd_file);
535
536     free(pdp_new);
537     free(tmpl_idx);
538     free(pdp_temp);
539     free(skip_update);
540     free(updvals);
541     return 0;
542
543   err_free_structures:
544     free(pdp_new);
545     free(tmpl_idx);
546     free(pdp_temp);
547     free(skip_update);
548     free(updvals);
549   err_close:
550     rrd_close(rrd_file);
551   err_free:
552     rrd_free(&rrd);
553   err_out:
554     return -1;
555 }
556
557 /*
558  * get exclusive lock to whole file.
559  * lock gets removed when we close the file
560  *
561  * returns 0 on success
562  */
563 int LockRRD(
564     int in_file)
565 {
566     int       rcstat;
567
568     {
569 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
570         struct _stat st;
571
572         if (_fstat(in_file, &st) == 0) {
573             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
574         } else {
575             rcstat = -1;
576         }
577 #else
578         struct flock lock;
579
580         lock.l_type = F_WRLCK;  /* exclusive write lock */
581         lock.l_len = 0; /* whole file */
582         lock.l_start = 0;   /* start of file */
583         lock.l_whence = SEEK_SET;   /* end of file */
584
585         rcstat = fcntl(in_file, F_SETLK, &lock);
586 #endif
587     }
588
589     return (rcstat);
590 }
591
592 /*
593  * Allocate some important arrays used, and initialize the template.
594  *
595  * When it returns, either all of the structures are allocated
596  * or none of them are.
597  *
598  * Returns 0 on success, -1 on error.
599  */
600 static int allocate_data_structures(
601     rrd_t *rrd,
602     char ***updvals,
603     rrd_value_t **pdp_temp,
604     const char *tmplt,
605     long **tmpl_idx,
606     unsigned long *tmpl_cnt,
607     unsigned long **rra_step_cnt,
608     unsigned long **skip_update,
609     rrd_value_t **pdp_new)
610 {
611     unsigned  i, ii;
612     if ((*updvals = (char **) malloc(sizeof(char *)
613                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
614         rrd_set_error("allocating updvals pointer array.");
615         return -1;
616     }
617     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
618                                             * rrd->stat_head->ds_cnt)) ==
619         NULL) {
620         rrd_set_error("allocating pdp_temp.");
621         goto err_free_updvals;
622     }
623     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
624                                                  *
625                                                  rrd->stat_head->rra_cnt)) ==
626         NULL) {
627         rrd_set_error("allocating skip_update.");
628         goto err_free_pdp_temp;
629     }
630     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
631                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
632         rrd_set_error("allocating tmpl_idx.");
633         goto err_free_skip_update;
634     }
635     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
636                                                   *
637                                                   (rrd->stat_head->
638                                                    rra_cnt))) == NULL) {
639         rrd_set_error("allocating rra_step_cnt.");
640         goto err_free_tmpl_idx;
641     }
642
643     /* initialize tmplt redirector */
644     /* default config example (assume DS 1 is a CDEF DS)
645        tmpl_idx[0] -> 0; (time)
646        tmpl_idx[1] -> 1; (DS 0)
647        tmpl_idx[2] -> 3; (DS 2)
648        tmpl_idx[3] -> 4; (DS 3) */
649     (*tmpl_idx)[0] = 0; /* time */
650     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
651         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
652             (*tmpl_idx)[ii++] = i;
653     }
654     *tmpl_cnt = ii;
655
656     if (tmplt != NULL) {
657         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
658             goto err_free_rra_step_cnt;
659         }
660     }
661
662     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
663                                            * rrd->stat_head->ds_cnt)) == NULL) {
664         rrd_set_error("allocating pdp_new.");
665         goto err_free_rra_step_cnt;
666     }
667
668     return 0;
669
670   err_free_rra_step_cnt:
671     free(*rra_step_cnt);
672   err_free_tmpl_idx:
673     free(*tmpl_idx);
674   err_free_skip_update:
675     free(*skip_update);
676   err_free_pdp_temp:
677     free(*pdp_temp);
678   err_free_updvals:
679     free(*updvals);
680     return -1;
681 }
682
683 /*
684  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
685  *
686  * Returns 0 on success.
687  */
688 static int parse_template(
689     rrd_t *rrd,
690     const char *tmplt,
691     unsigned long *tmpl_cnt,
692     long *tmpl_idx)
693 {
694     char     *dsname, *tmplt_copy;
695     unsigned int tmpl_len, i;
696     int       ret = 0;
697
698     *tmpl_cnt = 1;      /* the first entry is the time */
699
700     /* we should work on a writeable copy here */
701     if ((tmplt_copy = strdup(tmplt)) == NULL) {
702         rrd_set_error("error copying tmplt '%s'", tmplt);
703         ret = -1;
704         goto out;
705     }
706
707     dsname = tmplt_copy;
708     tmpl_len = strlen(tmplt_copy);
709     for (i = 0; i <= tmpl_len; i++) {
710         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
711             tmplt_copy[i] = '\0';
712             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
713                 rrd_set_error("tmplt contains more DS definitions than RRD");
714                 ret = -1;
715                 goto out_free_tmpl_copy;
716             }
717             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
718                 rrd_set_error("unknown DS name '%s'", dsname);
719                 ret = -1;
720                 goto out_free_tmpl_copy;
721             }
722             /* go to the next entry on the tmplt_copy */
723             if (i < tmpl_len)
724                 dsname = &tmplt_copy[i + 1];
725         }
726     }
727   out_free_tmpl_copy:
728     free(tmplt_copy);
729   out:
730     return ret;
731 }
732
733 /*
734  * Parse an update string, updates the primary data points (PDPs)
735  * and consolidated data points (CDPs), and writes changes to the RRAs.
736  *
737  * Returns 0 on success, -1 on error.
738  */
739 static int process_arg(
740     char *step_start,
741     rrd_t *rrd,
742     rrd_file_t *rrd_file,
743     unsigned long rra_begin,
744     unsigned long *rra_current,
745     time_t *current_time,
746     unsigned long *current_time_usec,
747     rrd_value_t *pdp_temp,
748     rrd_value_t *pdp_new,
749     unsigned long *rra_step_cnt,
750     char **updvals,
751     long *tmpl_idx,
752     unsigned long tmpl_cnt,
753     info_t **pcdp_summary,
754     int version,
755     unsigned long *skip_update,
756     int *schedule_smooth)
757 {
758     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
759
760     /* a vector of future Holt-Winters seasonal coefs */
761     unsigned long elapsed_pdp_st;
762
763     double    interval, pre_int, post_int;  /* interval between this and
764                                              * the last run */
765     unsigned long proc_pdp_cnt;
766     unsigned long rra_start;
767
768     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
769                  current_time, current_time_usec, version) == -1) {
770         return -1;
771     }
772     /* seek to the beginning of the rra's */
773     if (*rra_current != rra_begin) {
774 #ifndef HAVE_MMAP
775         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
776             rrd_set_error("seek error in rrd");
777             return -1;
778         }
779 #endif
780         *rra_current = rra_begin;
781     }
782     rra_start = rra_begin;
783
784     interval = (double) (*current_time - rrd->live_head->last_up)
785         + (double) ((long) *current_time_usec -
786                     (long) rrd->live_head->last_up_usec) / 1e6f;
787
788     /* process the data sources and update the pdp_prep 
789      * area accordingly */
790     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
791         return -1;
792     }
793
794     elapsed_pdp_st = calculate_elapsed_steps(rrd,
795                                              *current_time,
796                                              *current_time_usec, interval,
797                                              &pre_int, &post_int,
798                                              &proc_pdp_cnt);
799
800     /* has a pdp_st moment occurred since the last run ? */
801     if (elapsed_pdp_st == 0) {
802         /* no we have not passed a pdp_st moment. therefore update is simple */
803         simple_update(rrd, interval, pdp_new);
804     } else {
805         /* an pdp_st has occurred. */
806         if (process_all_pdp_st(rrd, interval,
807                                pre_int, post_int,
808                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
809             return -1;
810         }
811         if (update_all_cdp_prep(rrd, rra_step_cnt,
812                                 rra_begin, rrd_file,
813                                 elapsed_pdp_st,
814                                 proc_pdp_cnt,
815                                 &last_seasonal_coef,
816                                 &seasonal_coef,
817                                 pdp_temp, rra_current,
818                                 skip_update, schedule_smooth) == -1) {
819             goto err_free_coefficients;
820         }
821         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
822                                  elapsed_pdp_st, pdp_temp,
823                                  &seasonal_coef) == -1) {
824             goto err_free_coefficients;
825         }
826         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
827                           rra_current, *current_time, skip_update,
828                           pcdp_summary) == -1) {
829             goto err_free_coefficients;
830         }
831     }                   /* endif a pdp_st has occurred */
832     rrd->live_head->last_up = *current_time;
833     rrd->live_head->last_up_usec = *current_time_usec;
834
835     free(seasonal_coef);
836     free(last_seasonal_coef);
837     return 0;
838
839   err_free_coefficients:
840     free(seasonal_coef);
841     free(last_seasonal_coef);
842     return -1;
843 }
844
845 /*
846  * Parse a DS string (time + colon-separated values), storing the
847  * results in current_time, current_time_usec, and updvals.
848  *
849  * Returns 0 on success, -1 on error.
850  */
851 static int parse_ds(
852     rrd_t *rrd,
853     char **updvals,
854     long *tmpl_idx,
855     char *input,
856     unsigned long tmpl_cnt,
857     time_t *current_time,
858     unsigned long *current_time_usec,
859     int version)
860 {
861     char     *p;
862     unsigned long i;
863     char      timesyntax;
864
865     updvals[0] = input;
866     /* initialize all ds input to unknown except the first one
867        which has always got to be set */
868     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
869         updvals[i] = "U";
870
871     /* separate all ds elements; first must be examined separately
872        due to alternate time syntax */
873     if ((p = strchr(input, '@')) != NULL) {
874         timesyntax = '@';
875     } else if ((p = strchr(input, ':')) != NULL) {
876         timesyntax = ':';
877     } else {
878         rrd_set_error("expected timestamp not found in data source from %s",
879                       input);
880         return -1;
881     }
882     *p = '\0';
883     i = 1;
884     updvals[tmpl_idx[i++]] = p + 1;
885     while (*(++p)) {
886         if (*p == ':') {
887             *p = '\0';
888             if (i < tmpl_cnt) {
889                 updvals[tmpl_idx[i++]] = p + 1;
890             }
891         }
892     }
893
894     if (i != tmpl_cnt) {
895         rrd_set_error("expected %lu data source readings (got %lu) from %s",
896                       tmpl_cnt - 1, i, input);
897         return -1;
898     }
899
900     if (get_time_from_reading(rrd, timesyntax, updvals,
901                               current_time, current_time_usec,
902                               version) == -1) {
903         return -1;
904     }
905     return 0;
906 }
907
908 /*
909  * Parse the time in a DS string, store it in current_time and 
910  * current_time_usec and verify that it's later than the last
911  * update for this DS.
912  *
913  * Returns 0 on success, -1 on error.
914  */
915 static int get_time_from_reading(
916     rrd_t *rrd,
917     char timesyntax,
918     char **updvals,
919     time_t *current_time,
920     unsigned long *current_time_usec,
921     int version)
922 {
923     double    tmp;
924     char     *parsetime_error = NULL;
925     char     *old_locale;
926     struct rrd_time_value ds_tv;
927     struct timeval tmp_time;    /* used for time conversion */
928
929     /* get the time from the reading ... handle N */
930     if (timesyntax == '@') {    /* at-style */
931         if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
932             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
933             return -1;
934         }
935         if (ds_tv.type == RELATIVE_TO_END_TIME ||
936             ds_tv.type == RELATIVE_TO_START_TIME) {
937             rrd_set_error("specifying time relative to the 'start' "
938                           "or 'end' makes no sense here: %s", updvals[0]);
939             return -1;
940         }
941         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
942         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
943     } else if (strcmp(updvals[0], "N") == 0) {
944         gettimeofday(&tmp_time, 0);
945         normalize_time(&tmp_time);
946         *current_time = tmp_time.tv_sec;
947         *current_time_usec = tmp_time.tv_usec;
948     } else {
949         old_locale = setlocale(LC_NUMERIC, "C");
950         tmp = strtod(updvals[0], 0);
951         setlocale(LC_NUMERIC, old_locale);
952         *current_time = floor(tmp);
953         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
954     }
955     /* dont do any correction for old version RRDs */
956     if (version < 3)
957         *current_time_usec = 0;
958
959     if (*current_time < rrd->live_head->last_up ||
960         (*current_time == rrd->live_head->last_up &&
961          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
962         rrd_set_error("illegal attempt to update using time %ld when "
963                       "last update time is %ld (minimum one second step)",
964                       *current_time, rrd->live_head->last_up);
965         return -1;
966     }
967     return 0;
968 }
969
970 /*
971  * Update pdp_new by interpreting the updvals according to the DS type
972  * (COUNTER, GAUGE, etc.).
973  *
974  * Returns 0 on success, -1 on error.
975  */
976 static int update_pdp_prep(
977     rrd_t *rrd,
978     char **updvals,
979     rrd_value_t *pdp_new,
980     double interval)
981 {
982     unsigned long ds_idx;
983     int       ii;
984     char     *endptr;   /* used in the conversion */
985     double    rate;
986     char     *old_locale;
987     enum dst_en dst_idx;
988
989     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
990         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
991
992         /* make sure we do not build diffs with old last_ds values */
993         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
994             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
995             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
996         }
997
998         /* NOTE: DST_CDEF should never enter this if block, because
999          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1000          * accidently specified a value for the DST_CDEF. To handle this case,
1001          * an extra check is required. */
1002
1003         if ((updvals[ds_idx + 1][0] != 'U') &&
1004             (dst_idx != DST_CDEF) &&
1005             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1006             rate = DNAN;
1007
1008             /* pdp_new contains rate * time ... eg the bytes transferred during
1009              * the interval. Doing it this way saves a lot of math operations
1010              */
1011             switch (dst_idx) {
1012             case DST_COUNTER:
1013             case DST_DERIVE:
1014                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1015                     if ((updvals[ds_idx + 1][ii] < '0'
1016                          || updvals[ds_idx + 1][ii] > '9')
1017                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1018                         rrd_set_error("not a simple integer: '%s'",
1019                                       updvals[ds_idx + 1]);
1020                         return -1;
1021                     }
1022                 }
1023                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1024                     pdp_new[ds_idx] =
1025                         rrd_diff(updvals[ds_idx + 1],
1026                                  rrd->pdp_prep[ds_idx].last_ds);
1027                     if (dst_idx == DST_COUNTER) {
1028                         /* simple overflow catcher. This will fail
1029                          * terribly for non 32 or 64 bit counters
1030                          * ... are there any others in SNMP land?
1031                          */
1032                         if (pdp_new[ds_idx] < (double) 0.0)
1033                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1034                         if (pdp_new[ds_idx] < (double) 0.0)
1035                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1036                     }
1037                     rate = pdp_new[ds_idx] / interval;
1038                 } else {
1039                     pdp_new[ds_idx] = DNAN;
1040                 }
1041                 break;
1042             case DST_ABSOLUTE:
1043                 old_locale = setlocale(LC_NUMERIC, "C");
1044                 errno = 0;
1045                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1046                 setlocale(LC_NUMERIC, old_locale);
1047                 if (errno > 0) {
1048                     rrd_set_error("converting '%s' to float: %s",
1049                                   updvals[ds_idx + 1], rrd_strerror(errno));
1050                     return -1;
1051                 };
1052                 if (endptr[0] != '\0') {
1053                     rrd_set_error
1054                         ("conversion of '%s' to float not complete: tail '%s'",
1055                          updvals[ds_idx + 1], endptr);
1056                     return -1;
1057                 }
1058                 rate = pdp_new[ds_idx] / interval;
1059                 break;
1060             case DST_GAUGE:
1061                 errno = 0;
1062                 old_locale = setlocale(LC_NUMERIC, "C");
1063                 pdp_new[ds_idx] =
1064                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1065                 setlocale(LC_NUMERIC, old_locale);
1066                 if (errno) {
1067                     rrd_set_error("converting '%s' to float: %s",
1068                                   updvals[ds_idx + 1], rrd_strerror(errno));
1069                     return -1;
1070                 };
1071                 if (endptr[0] != '\0') {
1072                     rrd_set_error
1073                         ("conversion of '%s' to float not complete: tail '%s'",
1074                          updvals[ds_idx + 1], endptr);
1075                     return -1;
1076                 }
1077                 rate = pdp_new[ds_idx] / interval;
1078                 break;
1079             default:
1080                 rrd_set_error("rrd contains unknown DS type : '%s'",
1081                               rrd->ds_def[ds_idx].dst);
1082                 return -1;
1083             }
1084             /* break out of this for loop if the error string is set */
1085             if (rrd_test_error()) {
1086                 return -1;
1087             }
1088             /* make sure pdp_temp is neither too large or too small
1089              * if any of these occur it becomes unknown ...
1090              * sorry folks ... */
1091             if (!isnan(rate) &&
1092                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1093                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1094                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1095                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1096                 pdp_new[ds_idx] = DNAN;
1097             }
1098         } else {
1099             /* no news is news all the same */
1100             pdp_new[ds_idx] = DNAN;
1101         }
1102
1103
1104         /* make a copy of the command line argument for the next run */
1105 #ifdef DEBUG
1106         fprintf(stderr, "prep ds[%lu]\t"
1107                 "last_arg '%s'\t"
1108                 "this_arg '%s'\t"
1109                 "pdp_new %10.2f\n",
1110                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1111                 pdp_new[ds_idx]);
1112 #endif
1113         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1114                 LAST_DS_LEN - 1);
1115         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1116     }
1117     return 0;
1118 }
1119
1120 /*
1121  * How many PDP steps have elapsed since the last update? Returns the answer,
1122  * and stores the time between the last update and the last PDP in pre_time,
1123  * and the time between the last PDP and the current time in post_int.
1124  */
1125 static int calculate_elapsed_steps(
1126     rrd_t *rrd,
1127     unsigned long current_time,
1128     unsigned long current_time_usec,
1129     double interval,
1130     double *pre_int,
1131     double *post_int,
1132     unsigned long *proc_pdp_cnt)
1133 {
1134     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1135     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1136                                  * time */
1137     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1138                                  * when it was last updated */
1139     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1140
1141     /* when was the current pdp started */
1142     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1143     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1144
1145     /* when did the last pdp_st occur */
1146     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1147     occu_pdp_st = current_time - occu_pdp_age;
1148
1149     if (occu_pdp_st > proc_pdp_st) {
1150         /* OK we passed the pdp_st moment */
1151         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1152                                                                      * occurred before the latest
1153                                                                      * pdp_st moment*/
1154         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1155         *post_int = occu_pdp_age;   /* how much after it */
1156         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1157     } else {
1158         *pre_int = interval;
1159         *post_int = 0;
1160     }
1161
1162     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1163
1164 #ifdef DEBUG
1165     printf("proc_pdp_age %lu\t"
1166            "proc_pdp_st %lu\t"
1167            "occu_pfp_age %lu\t"
1168            "occu_pdp_st %lu\t"
1169            "int %lf\t"
1170            "pre_int %lf\t"
1171            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1172            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1173 #endif
1174
1175     /* compute the number of elapsed pdp_st moments */
1176     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1177 }
1178
1179 /*
1180  * Increment the PDP values by the values in pdp_new, or else initialize them.
1181  */
1182 static void simple_update(
1183     rrd_t *rrd,
1184     double interval,
1185     rrd_value_t *pdp_new)
1186 {
1187     int       i;
1188
1189     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1190         if (isnan(pdp_new[i])) {
1191             /* this is not really accurate if we use subsecond data arrival time
1192                should have thought of it when going subsecond resolution ...
1193                sorry next format change we will have it! */
1194             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1195                 floor(interval);
1196         } else {
1197             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1198                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1199             } else {
1200                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1201             }
1202         }
1203 #ifdef DEBUG
1204         fprintf(stderr,
1205                 "NO PDP  ds[%i]\t"
1206                 "value %10.2f\t"
1207                 "unkn_sec %5lu\n",
1208                 i,
1209                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1210                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1211 #endif
1212     }
1213 }
1214
1215 /*
1216  * Call process_pdp_st for each DS.
1217  *
1218  * Returns 0 on success, -1 on error.
1219  */
1220 static int process_all_pdp_st(
1221     rrd_t *rrd,
1222     double interval,
1223     double pre_int,
1224     double post_int,
1225     unsigned long elapsed_pdp_st,
1226     rrd_value_t *pdp_new,
1227     rrd_value_t *pdp_temp)
1228 {
1229     unsigned long ds_idx;
1230
1231     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1232        rate*seconds which occurred up to the last run.
1233        pdp_new[] contains rate*seconds from the latest run.
1234        pdp_temp[] will contain the rate for cdp */
1235
1236     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1237         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1238                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1239                            pdp_new, pdp_temp) == -1) {
1240             return -1;
1241         }
1242 #ifdef DEBUG
1243         fprintf(stderr, "PDP UPD ds[%lu]\t"
1244                 "elapsed_pdp_st %lu\t"
1245                 "pdp_temp %10.2f\t"
1246                 "new_prep %10.2f\t"
1247                 "new_unkn_sec %5lu\n",
1248                 ds_idx,
1249                 elapsed_pdp_st,
1250                 pdp_temp[ds_idx],
1251                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1252                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1253 #endif
1254     }
1255     return 0;
1256 }
1257
1258 /*
1259  * Process an update that occurs after one of the PDP moments.
1260  * Increments the PDP value, sets NAN if time greater than the
1261  * heartbeats have elapsed, processes CDEFs.
1262  *
1263  * Returns 0 on success, -1 on error.
1264  */
1265 static int process_pdp_st(
1266     rrd_t *rrd,
1267     unsigned long ds_idx,
1268     double interval,
1269     double pre_int,
1270     double post_int,
1271     long diff_pdp_st, /* number of seconds in full steps passed since last update */
1272     rrd_value_t *pdp_new,
1273     rrd_value_t *pdp_temp)
1274 {
1275     int       i;
1276
1277     /* update pdp_prep to the current pdp_st. */
1278     double    pre_unknown = 0.0;
1279     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1280     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1281
1282     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1283
1284     rpnstack_init(&rpnstack);
1285
1286
1287     if (isnan(pdp_new[ds_idx])) {
1288         /* a final bit of unknown to be added before calculation
1289            we use a temporary variable for this so that we
1290            don't have to turn integer lines before using the value */
1291         pre_unknown = pre_int;
1292     } else {
1293         if (isnan(scratch[PDP_val].u_val)) {
1294             scratch[PDP_val].u_val = 0;
1295         }
1296         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1297     }
1298
1299     /* if too much of the pdp_prep is unknown we dump it */
1300     /* if the interval is larger thatn mrhb we get NAN */
1301     if ((interval > mrhb) ||
1302         (rrd->stat_head->pdp_step/2.0 < (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1303         pdp_temp[ds_idx] = DNAN;
1304     } else {
1305         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1306             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1307              pre_unknown);
1308     }
1309
1310     /* process CDEF data sources; remember each CDEF DS can
1311      * only reference other DS with a lower index number */
1312     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1313         rpnp_t   *rpnp;
1314
1315         rpnp =
1316             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1317         /* substitute data values for OP_VARIABLE nodes */
1318         for (i = 0; rpnp[i].op != OP_END; i++) {
1319             if (rpnp[i].op == OP_VARIABLE) {
1320                 rpnp[i].op = OP_NUMBER;
1321                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1322             }
1323         }
1324         /* run the rpn calculator */
1325         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1326             free(rpnp);
1327             rpnstack_free(&rpnstack);
1328             return -1;
1329         }
1330     }
1331
1332     /* make pdp_prep ready for the next run */
1333     if (isnan(pdp_new[ds_idx])) {
1334         /* this is not realy accurate if we use subsecond data arival time
1335            should have thought of it when going subsecond resolution ...
1336            sorry next format change we will have it! */
1337         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1338         scratch[PDP_val].u_val = DNAN;
1339     } else {
1340         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1341         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1342     }
1343     rpnstack_free(&rpnstack);
1344     return 0;
1345 }
1346
1347 /*
1348  * Iterate over all the RRAs for a given DS and:
1349  * 1. Decide whether to schedule a smooth later
1350  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1351  * 3. Update the CDP
1352  *
1353  * Returns 0 on success, -1 on error
1354  */
1355 static int update_all_cdp_prep(
1356     rrd_t *rrd,
1357     unsigned long *rra_step_cnt,
1358     unsigned long rra_begin,
1359     rrd_file_t *rrd_file,
1360     unsigned long elapsed_pdp_st,
1361     unsigned long proc_pdp_cnt,
1362     rrd_value_t **last_seasonal_coef,
1363     rrd_value_t **seasonal_coef,
1364     rrd_value_t *pdp_temp,
1365     unsigned long *rra_current,
1366     unsigned long *skip_update,
1367     int *schedule_smooth)
1368 {
1369     unsigned long rra_idx;
1370
1371     /* index into the CDP scratch array */
1372     enum cf_en current_cf;
1373     unsigned long rra_start;
1374
1375     /* number of rows to be updated in an RRA for a data value. */
1376     unsigned long start_pdp_offset;
1377
1378     rra_start = rra_begin;
1379     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1380         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1381         start_pdp_offset =
1382             rrd->rra_def[rra_idx].pdp_cnt -
1383             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1384         skip_update[rra_idx] = 0;
1385         if (start_pdp_offset <= elapsed_pdp_st) {
1386             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1387                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1388         } else {
1389             rra_step_cnt[rra_idx] = 0;
1390         }
1391
1392         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1393             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1394              * so that they will be correct for the next observed value; note that for
1395              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1396              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1397             if (rra_step_cnt[rra_idx] > 1) {
1398                 skip_update[rra_idx] = 1;
1399                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1400                                 elapsed_pdp_st, last_seasonal_coef);
1401                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1402                                 elapsed_pdp_st + 1, seasonal_coef);
1403             }
1404             /* periodically run a smoother for seasonal effects */
1405             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1406 #ifdef DEBUG
1407                 fprintf(stderr,
1408                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1409                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1410                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1411                         u_cnt);
1412 #endif
1413                 *schedule_smooth = 1;
1414             }
1415             *rra_current = rrd_tell(rrd_file);
1416         }
1417         if (rrd_test_error())
1418             return -1;
1419
1420         if (update_cdp_prep
1421             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1422              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1423              current_cf) == -1) {
1424             return -1;
1425         }
1426         rra_start +=
1427             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1428             sizeof(rrd_value_t);
1429     }
1430     return 0;
1431 }
1432
1433 /* 
1434  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1435  */
1436 static int do_schedule_smooth(
1437     rrd_t *rrd,
1438     unsigned long rra_idx,
1439     unsigned long elapsed_pdp_st)
1440 {
1441     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1442     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1443     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1444     unsigned long seasonal_smooth_idx =
1445         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1446     unsigned long *init_seasonal =
1447         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1448
1449     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1450      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1451      * really an RRA level, not a data source within RRA level parameter, but
1452      * the rra_def is read only for rrd_update (not flushed to disk). */
1453     if (*init_seasonal > BURNIN_CYCLES) {
1454         /* someone has no doubt invented a trick to deal with this wrap around,
1455          * but at least this code is clear. */
1456         if (seasonal_smooth_idx > cur_row) {
1457             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1458              * between PDP and CDP */
1459             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1460         }
1461         /* can't rely on negative numbers because we are working with
1462          * unsigned values */
1463         return (cur_row + elapsed_pdp_st >= row_cnt
1464                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1465     }
1466     /* mark off one of the burn-in cycles */
1467     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1468 }
1469
1470 /*
1471  * For a given RRA, iterate over the data sources and call the appropriate
1472  * consolidation function.
1473  *
1474  * Returns 0 on success, -1 on error.
1475  */
1476 static int update_cdp_prep(
1477     rrd_t *rrd,
1478     unsigned long elapsed_pdp_st,
1479     unsigned long start_pdp_offset,
1480     unsigned long *rra_step_cnt,
1481     int rra_idx,
1482     rrd_value_t *pdp_temp,
1483     rrd_value_t *last_seasonal_coef,
1484     rrd_value_t *seasonal_coef,
1485     int current_cf)
1486 {
1487     unsigned long ds_idx, cdp_idx;
1488
1489     /* update CDP_PREP areas */
1490     /* loop over data soures within each RRA */
1491     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1492
1493         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1494
1495         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1496             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1497                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1498                        elapsed_pdp_st, start_pdp_offset,
1499                        rrd->rra_def[rra_idx].pdp_cnt,
1500                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1501                        rra_idx, ds_idx);
1502         } else {
1503             /* Nothing to consolidate if there's one PDP per CDP. However, if
1504              * we've missed some PDPs, let's update null counters etc. */
1505             if (elapsed_pdp_st > 2) {
1506                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1507                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1508                           current_cf);
1509             }
1510         }
1511
1512         if (rrd_test_error())
1513             return -1;
1514     }                   /* endif data sources loop */
1515     return 0;
1516 }
1517
1518 /*
1519  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1520  * primary value, secondary value, and # of unknowns.
1521  */
1522 static void update_cdp(
1523     unival *scratch,
1524     int current_cf,
1525     rrd_value_t pdp_temp_val,
1526     unsigned long rra_step_cnt,
1527     unsigned long elapsed_pdp_st,
1528     unsigned long start_pdp_offset,
1529     unsigned long pdp_cnt,
1530     rrd_value_t xff,
1531     int i,
1532     int ii)
1533 {
1534     /* shorthand variables */
1535     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1536     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1537     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1538     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1539
1540     if (rra_step_cnt) {
1541         /* If we are in this block, as least 1 CDP value will be written to
1542          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1543          * to be written, then the "fill in" value is the CDP_secondary_val
1544          * entry. */
1545         if (isnan(pdp_temp_val)) {
1546             *cdp_unkn_pdp_cnt += start_pdp_offset;
1547             *cdp_secondary_val = DNAN;
1548         } else {
1549             /* CDP_secondary value is the RRA "fill in" value for intermediary
1550              * CDP data entries. No matter the CF, the value is the same because
1551              * the average, max, min, and last of a list of identical values is
1552              * the same, namely, the value itself. */
1553             *cdp_secondary_val = pdp_temp_val;
1554         }
1555
1556         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1557             *cdp_primary_val = DNAN;
1558             if (current_cf == CF_AVERAGE) {
1559                 *cdp_val =
1560                     initialize_average_carry_over(pdp_temp_val,
1561                                                   elapsed_pdp_st,
1562                                                   start_pdp_offset, pdp_cnt);
1563             } else {
1564                 *cdp_val = pdp_temp_val;
1565             }
1566         } else {
1567             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1568                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1569         }               /* endif meets xff value requirement for a valid value */
1570         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1571          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1572         if (isnan(pdp_temp_val))
1573             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1574         else
1575             *cdp_unkn_pdp_cnt = 0;
1576     } else {            /* rra_step_cnt[i]  == 0 */
1577
1578 #ifdef DEBUG
1579         if (isnan(*cdp_val)) {
1580             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1581                     i, ii);
1582         } else {
1583             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1584                     i, ii, *cdp_val);
1585         }
1586 #endif
1587         if (isnan(pdp_temp_val)) {
1588             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1589         } else {
1590             *cdp_val =
1591                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1592                                   current_cf, i, ii);
1593         }
1594     }
1595 }
1596
1597 /*
1598  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1599  * on the type of consolidation function.
1600  */
1601 static void initialize_cdp_val(
1602     unival *scratch,
1603     int current_cf,
1604     rrd_value_t pdp_temp_val,
1605     unsigned long elapsed_pdp_st,
1606     unsigned long start_pdp_offset,
1607     unsigned long pdp_cnt)
1608 {
1609     rrd_value_t cum_val, cur_val;
1610
1611     switch (current_cf) {
1612     case CF_AVERAGE:
1613         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1614         cur_val = IFDNAN(pdp_temp_val, 0.0);
1615         scratch[CDP_primary_val].u_val =
1616             (cum_val + cur_val * start_pdp_offset) /
1617             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1618         scratch[CDP_val].u_val =
1619             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1620                                           start_pdp_offset, pdp_cnt);
1621         break;
1622     case CF_MAXIMUM:
1623         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1624         cur_val = IFDNAN(pdp_temp_val, -DINF);
1625 #if 0
1626 #ifdef DEBUG
1627         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1628             fprintf(stderr,
1629                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1630                     i, ii);
1631             exit(-1);
1632         }
1633 #endif
1634 #endif
1635         if (cur_val > cum_val)
1636             scratch[CDP_primary_val].u_val = cur_val;
1637         else
1638             scratch[CDP_primary_val].u_val = cum_val;
1639         /* initialize carry over value */
1640         scratch[CDP_val].u_val = pdp_temp_val;
1641         break;
1642     case CF_MINIMUM:
1643         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1644         cur_val = IFDNAN(pdp_temp_val, DINF);
1645 #if 0
1646 #ifdef DEBUG
1647         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1648             fprintf(stderr,
1649                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1650                     ii);
1651             exit(-1);
1652         }
1653 #endif
1654 #endif
1655         if (cur_val < cum_val)
1656             scratch[CDP_primary_val].u_val = cur_val;
1657         else
1658             scratch[CDP_primary_val].u_val = cum_val;
1659         /* initialize carry over value */
1660         scratch[CDP_val].u_val = pdp_temp_val;
1661         break;
1662     case CF_LAST:
1663     default:
1664         scratch[CDP_primary_val].u_val = pdp_temp_val;
1665         /* initialize carry over value */
1666         scratch[CDP_val].u_val = pdp_temp_val;
1667         break;
1668     }
1669 }
1670
1671 /*
1672  * Update the consolidation function for Holt-Winters functions as
1673  * well as other functions that don't actually consolidate multiple
1674  * PDPs.
1675  */
1676 static void reset_cdp(
1677     rrd_t *rrd,
1678     unsigned long elapsed_pdp_st,
1679     rrd_value_t *pdp_temp,
1680     rrd_value_t *last_seasonal_coef,
1681     rrd_value_t *seasonal_coef,
1682     int rra_idx,
1683     int ds_idx,
1684     int cdp_idx,
1685     enum cf_en current_cf)
1686 {
1687     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1688
1689     switch (current_cf) {
1690     case CF_AVERAGE:
1691     default:
1692         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1693         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1694         break;
1695     case CF_SEASONAL:
1696     case CF_DEVSEASONAL:
1697         /* need to update cached seasonal values, so they are consistent
1698          * with the bulk update */
1699         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1700          * CDP_last_deviation are the same. */
1701         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1702         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1703         break;
1704     case CF_HWPREDICT:
1705     case CF_MHWPREDICT:
1706         /* need to update the null_count and last_null_count.
1707          * even do this for non-DNAN pdp_temp because the
1708          * algorithm is not learning from batch updates. */
1709         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1710         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1711         /* fall through */
1712     case CF_DEVPREDICT:
1713         scratch[CDP_primary_val].u_val = DNAN;
1714         scratch[CDP_secondary_val].u_val = DNAN;
1715         break;
1716     case CF_FAILURES:
1717         /* do not count missed bulk values as failures */
1718         scratch[CDP_primary_val].u_val = 0;
1719         scratch[CDP_secondary_val].u_val = 0;
1720         /* need to reset violations buffer.
1721          * could do this more carefully, but for now, just
1722          * assume a bulk update wipes away all violations. */
1723         erase_violations(rrd, cdp_idx, rra_idx);
1724         break;
1725     }
1726 }
1727
1728 static rrd_value_t initialize_average_carry_over(
1729     rrd_value_t pdp_temp_val,
1730     unsigned long elapsed_pdp_st,
1731     unsigned long start_pdp_offset,
1732     unsigned long pdp_cnt)
1733 {
1734     /* initialize carry over value */
1735     if (isnan(pdp_temp_val)) {
1736         return DNAN;
1737     }
1738     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1739 }
1740
1741 /*
1742  * Update or initialize a CDP value based on the consolidation
1743  * function.
1744  *
1745  * Returns the new value.
1746  */
1747 static rrd_value_t calculate_cdp_val(
1748     rrd_value_t cdp_val,
1749     rrd_value_t pdp_temp_val,
1750     unsigned long elapsed_pdp_st,
1751     int current_cf,
1752 #ifdef DEBUG
1753     int i,
1754     int ii
1755 #else
1756     int UNUSED(i),
1757     int UNUSED(ii)
1758 #endif
1759     )
1760 {
1761     if (isnan(cdp_val)) {
1762         if (current_cf == CF_AVERAGE) {
1763             pdp_temp_val *= elapsed_pdp_st;
1764         }
1765 #ifdef DEBUG
1766         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1767                 i, ii, pdp_temp_val);
1768 #endif
1769         return pdp_temp_val;
1770     }
1771     if (current_cf == CF_AVERAGE)
1772         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1773     if (current_cf == CF_MINIMUM)
1774         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1775     if (current_cf == CF_MAXIMUM)
1776         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1777
1778     return pdp_temp_val;
1779 }
1780
1781 /*
1782  * For each RRA, update the seasonal values and then call update_aberrant_CF
1783  * for each data source.
1784  *
1785  * Return 0 on success, -1 on error.
1786  */
1787 static int update_aberrant_cdps(
1788     rrd_t *rrd,
1789     rrd_file_t *rrd_file,
1790     unsigned long rra_begin,
1791     unsigned long *rra_current,
1792     unsigned long elapsed_pdp_st,
1793     rrd_value_t *pdp_temp,
1794     rrd_value_t **seasonal_coef)
1795 {
1796     unsigned long rra_idx, ds_idx, j;
1797
1798     /* number of PDP steps since the last update that
1799      * are assigned to the first CDP to be generated
1800      * since the last update. */
1801     unsigned short scratch_idx;
1802     unsigned long rra_start;
1803     enum cf_en current_cf;
1804
1805     /* this loop is only entered if elapsed_pdp_st < 3 */
1806     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1807          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1808         rra_start = rra_begin;
1809         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1810             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1811                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1812                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1813                     if (scratch_idx == CDP_primary_val) {
1814                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1815                                         elapsed_pdp_st + 1, seasonal_coef);
1816                     } else {
1817                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1818                                         elapsed_pdp_st + 2, seasonal_coef);
1819                     }
1820                     *rra_current = rrd_tell(rrd_file);
1821                 }
1822                 if (rrd_test_error())
1823                     return -1;
1824                 /* loop over data soures within each RRA */
1825                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1826                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1827                                        rra_idx * (rrd->stat_head->ds_cnt) +
1828                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1829                                        *seasonal_coef);
1830                 }
1831             }
1832             rra_start += rrd->rra_def[rra_idx].row_cnt
1833                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1834         }
1835     }
1836     return 0;
1837 }
1838
1839 /* 
1840  * Move sequentially through the file, writing one RRA at a time.  Note this
1841  * architecture divorces the computation of CDP with flushing updated RRA
1842  * entries to disk.
1843  *
1844  * Return 0 on success, -1 on error.
1845  */
1846 static int write_to_rras(
1847     rrd_t *rrd,
1848     rrd_file_t *rrd_file,
1849     unsigned long *rra_step_cnt,
1850     unsigned long rra_begin,
1851     unsigned long *rra_current,
1852     time_t current_time,
1853     unsigned long *skip_update,
1854     info_t **pcdp_summary)
1855 {
1856     unsigned long rra_idx;
1857     unsigned long rra_start;
1858     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1859     time_t    rra_time = 0; /* time of update for a RRA */
1860
1861     /* Ready to write to disk */
1862     rra_start = rra_begin;
1863     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1864         /* skip unless there's something to write */
1865         if (rra_step_cnt[rra_idx]) {
1866             /* write the first row */
1867 #ifdef DEBUG
1868             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1869 #endif
1870             rrd->rra_ptr[rra_idx].cur_row++;
1871             if (rrd->rra_ptr[rra_idx].cur_row >=
1872                 rrd->rra_def[rra_idx].row_cnt)
1873                 rrd->rra_ptr[rra_idx].cur_row = 0;  /* wrap around */
1874             /* position on the first row */
1875             rra_pos_tmp = rra_start +
1876                 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1877                 sizeof(rrd_value_t);
1878             if (rra_pos_tmp != *rra_current) {
1879                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1880                     rrd_set_error("seek error in rrd");
1881                     return -1;
1882                 }
1883                 *rra_current = rra_pos_tmp;
1884             }
1885 #ifdef DEBUG
1886             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1887 #endif
1888             if (!skip_update[rra_idx]) {
1889                 if (*pcdp_summary != NULL) {
1890                     rra_time = (current_time - current_time
1891                                 % (rrd->rra_def[rra_idx].pdp_cnt *
1892                                    rrd->stat_head->pdp_step))
1893                         -
1894                         ((rra_step_cnt[rra_idx] -
1895                           1) * rrd->rra_def[rra_idx].pdp_cnt *
1896                          rrd->stat_head->pdp_step);
1897                 }
1898                 if (write_RRA_row
1899                     (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1900                      pcdp_summary, rra_time) == -1)
1901                     return -1;
1902             }
1903
1904             /* write other rows of the bulk update, if any */
1905             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1906                 if (++rrd->rra_ptr[rra_idx].cur_row ==
1907                     rrd->rra_def[rra_idx].row_cnt) {
1908 #ifdef DEBUG
1909                     fprintf(stderr,
1910                             "Wraparound for RRA %s, %lu updates left\n",
1911                             rrd->rra_def[rra_idx].cf_nam,
1912                             rra_step_cnt[rra_idx] - 1);
1913 #endif
1914                     /* wrap */
1915                     rrd->rra_ptr[rra_idx].cur_row = 0;
1916                     /* seek back to beginning of current rra */
1917                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1918                         rrd_set_error("seek error in rrd");
1919                         return -1;
1920                     }
1921 #ifdef DEBUG
1922                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1923                             rrd_file->pos);
1924 #endif
1925                     *rra_current = rra_start;
1926                 }
1927                 if (!skip_update[rra_idx]) {
1928                     if (*pcdp_summary != NULL) {
1929                         rra_time = (current_time - current_time
1930                                     % (rrd->rra_def[rra_idx].pdp_cnt *
1931                                        rrd->stat_head->pdp_step))
1932                             -
1933                             ((rra_step_cnt[rra_idx] -
1934                               2) * rrd->rra_def[rra_idx].pdp_cnt *
1935                              rrd->stat_head->pdp_step);
1936                     }
1937                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1938                                       CDP_secondary_val, pcdp_summary,
1939                                       rra_time) == -1)
1940                         return -1;
1941                 }
1942             }
1943         }
1944         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1945             sizeof(rrd_value_t);
1946     }                   /* RRA LOOP */
1947
1948     return 0;
1949 }
1950
1951 /*
1952  * Write out one row of values (one value per DS) to the archive.
1953  *
1954  * Returns 0 on success, -1 on error.
1955  */
1956 static int write_RRA_row(
1957     rrd_file_t *rrd_file,
1958     rrd_t *rrd,
1959     unsigned long rra_idx,
1960     unsigned long *rra_current,
1961     unsigned short CDP_scratch_idx,
1962     info_t **pcdp_summary,
1963     time_t rra_time)
1964 {
1965     unsigned long ds_idx, cdp_idx;
1966     infoval   iv;
1967
1968     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1969         /* compute the cdp index */
1970         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1971 #ifdef DEBUG
1972         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1973                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1974                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1975 #endif
1976         if (*pcdp_summary != NULL) {
1977             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1978             /* append info to the return hash */
1979             *pcdp_summary = info_push(*pcdp_summary,
1980                                       sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1981                                                     rra_time,
1982                                                     rrd->rra_def[rra_idx].
1983                                                     cf_nam,
1984                                                     rrd->rra_def[rra_idx].
1985                                                     pdp_cnt,
1986                                                     rrd->ds_def[ds_idx].
1987                                                     ds_nam), RD_I_VAL, iv);
1988         }
1989         if (rrd_write(rrd_file,
1990                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1991                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1992             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1993             return -1;
1994         }
1995         *rra_current += sizeof(rrd_value_t);
1996     }
1997     return 0;
1998 }
1999
2000 /*
2001  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2002  *
2003  * Returns 0 on success, -1 otherwise
2004  */
2005 static int smooth_all_rras(
2006     rrd_t *rrd,
2007     rrd_file_t *rrd_file,
2008     unsigned long rra_begin)
2009 {
2010     unsigned long rra_start = rra_begin;
2011     unsigned long rra_idx;
2012
2013     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2014         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2015             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2016 #ifdef DEBUG
2017             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2018 #endif
2019             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2020             if (rrd_test_error())
2021                 return -1;
2022         }
2023         rra_start += rrd->rra_def[rra_idx].row_cnt
2024             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2025     }
2026     return 0;
2027 }
2028
2029 #ifndef HAVE_MMAP
2030 /*
2031  * Flush changes to disk (unless we're using mmap)
2032  *
2033  * Returns 0 on success, -1 otherwise
2034  */
2035 static int write_changes_to_disk(
2036     rrd_t *rrd,
2037     rrd_file_t *rrd_file,
2038     int version)
2039 {
2040     /* we just need to write back the live header portion now */
2041     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2042                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2043                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2044                  SEEK_SET) != 0) {
2045         rrd_set_error("seek rrd for live header writeback");
2046         return -1;
2047     }
2048     if (version >= 3) {
2049         if (rrd_write(rrd_file, rrd->live_head,
2050                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2051             rrd_set_error("rrd_write live_head to rrd");
2052             return -1;
2053         }
2054     } else {
2055         if (rrd_write(rrd_file, &rrd->live_head->last_up,
2056                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2057             rrd_set_error("rrd_write live_head to rrd");
2058             return -1;
2059         }
2060     }
2061
2062
2063     if (rrd_write(rrd_file, rrd->pdp_prep,
2064                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2065         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2066         rrd_set_error("rrd_write pdp_prep to rrd");
2067         return -1;
2068     }
2069
2070     if (rrd_write(rrd_file, rrd->cdp_prep,
2071                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2072                   rrd->stat_head->ds_cnt)
2073         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2074                       rrd->stat_head->ds_cnt)) {
2075
2076         rrd_set_error("rrd_write cdp_prep to rrd");
2077         return -1;
2078     }
2079
2080     if (rrd_write(rrd_file, rrd->rra_ptr,
2081                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2082         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2083         rrd_set_error("rrd_write rra_ptr to rrd");
2084         return -1;
2085     }
2086     return 0;
2087 }
2088 #endif