Fixed handling of unknown data at PDP build time. There was a long standing
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.2.99907080300  Copyright by Tobi Oetiker, 1997-2007
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
9
10 #include "rrd_tool.h"
11
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
17
18 #include <locale.h>
19
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
22
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
25
26 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 /*
28  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
29  * replacement.
30  */
31 #include <sys/timeb.h>
32
33 #ifndef __MINGW32__
34 struct timeval {
35     time_t    tv_sec;   /* seconds */
36     long      tv_usec;  /* microseconds */
37 };
38 #endif
39
40 struct __timezone {
41     int       tz_minuteswest;   /* minutes W of Greenwich */
42     int       tz_dsttime;   /* type of dst correction */
43 };
44
45 static int gettimeofday(
46     struct timeval *t,
47     struct __timezone *tz)
48 {
49
50     struct _timeb current_time;
51
52     _ftime(&current_time);
53
54     t->tv_sec = current_time.time;
55     t->tv_usec = current_time.millitm * 1000;
56
57     return 0;
58 }
59
60 #endif
61
62 #define DEBUG 1
63
64 /* FUNCTION PROTOTYPES */
65
66 int       rrd_update_r(
67     const char *filename,
68     const char *tmplt,
69     int argc,
70     const char **argv);
71 int       _rrd_update(
72     const char *filename,
73     const char *tmplt,
74     int argc,
75     const char **argv,
76     info_t *);
77
78 static int allocate_data_structures(
79     rrd_t *rrd,
80     char ***updvals,
81     rrd_value_t **pdp_temp,
82     const char *tmplt,
83     long **tmpl_idx,
84     unsigned long *tmpl_cnt,
85     unsigned long **rra_step_cnt,
86     unsigned long **skip_update,
87     rrd_value_t **pdp_new);
88
89 static int parse_template(
90     rrd_t *rrd,
91     const char *tmplt,
92     unsigned long *tmpl_cnt,
93     long *tmpl_idx);
94
95 static int process_arg(
96     char *step_start,
97     rrd_t *rrd,
98     rrd_file_t *rrd_file,
99     unsigned long rra_begin,
100     unsigned long *rra_current,
101     time_t *current_time,
102     unsigned long *current_time_usec,
103     rrd_value_t *pdp_temp,
104     rrd_value_t *pdp_new,
105     unsigned long *rra_step_cnt,
106     char **updvals,
107     long *tmpl_idx,
108     unsigned long tmpl_cnt,
109     info_t **pcdp_summary,
110     int version,
111     unsigned long *skip_update,
112     int *schedule_smooth);
113
114 static int parse_ds(
115     rrd_t *rrd,
116     char **updvals,
117     long *tmpl_idx,
118     char *input,
119     unsigned long tmpl_cnt,
120     time_t *current_time,
121     unsigned long *current_time_usec,
122     int version);
123
124 static int get_time_from_reading(
125     rrd_t *rrd,
126     char timesyntax,
127     char **updvals,
128     time_t *current_time,
129     unsigned long *current_time_usec,
130     int version);
131
132 static int update_pdp_prep(
133     rrd_t *rrd,
134     char **updvals,
135     rrd_value_t *pdp_new,
136     double interval);
137
138 static int calculate_elapsed_steps(
139     rrd_t *rrd,
140     unsigned long current_time,
141     unsigned long current_time_usec,
142     double interval,
143     double *pre_int,
144     double *post_int,
145     unsigned long *proc_pdp_cnt);
146
147 static void simple_update(
148     rrd_t *rrd,
149     double interval,
150     rrd_value_t *pdp_new);
151
152 static int process_all_pdp_st(
153     rrd_t *rrd,
154     double interval,
155     double pre_int,
156     double post_int,
157     unsigned long elapsed_pdp_st,
158     rrd_value_t *pdp_new,
159     rrd_value_t *pdp_temp);
160
161 static int process_pdp_st(
162     rrd_t *rrd,
163     unsigned long ds_idx,
164     double interval,
165     double pre_int,
166     double post_int,
167     long diff_pdp_st,
168     rrd_value_t *pdp_new,
169     rrd_value_t *pdp_temp);
170
171 static int update_all_cdp_prep(
172     rrd_t *rrd,
173     unsigned long *rra_step_cnt,
174     unsigned long rra_begin,
175     rrd_file_t *rrd_file,
176     unsigned long elapsed_pdp_st,
177     unsigned long proc_pdp_cnt,
178     rrd_value_t **last_seasonal_coef,
179     rrd_value_t **seasonal_coef,
180     rrd_value_t *pdp_temp,
181     unsigned long *rra_current,
182     unsigned long *skip_update,
183     int *schedule_smooth);
184
185 static int do_schedule_smooth(
186     rrd_t *rrd,
187     unsigned long rra_idx,
188     unsigned long elapsed_pdp_st);
189
190 static int update_cdp_prep(
191     rrd_t *rrd,
192     unsigned long elapsed_pdp_st,
193     unsigned long start_pdp_offset,
194     unsigned long *rra_step_cnt,
195     int rra_idx,
196     rrd_value_t *pdp_temp,
197     rrd_value_t *last_seasonal_coef,
198     rrd_value_t *seasonal_coef,
199     int current_cf);
200
201 static void update_cdp(
202     unival *scratch,
203     int current_cf,
204     rrd_value_t pdp_temp_val,
205     unsigned long rra_step_cnt,
206     unsigned long elapsed_pdp_st,
207     unsigned long start_pdp_offset,
208     unsigned long pdp_cnt,
209     rrd_value_t xff,
210     int i,
211     int ii);
212
213 static void initialize_cdp_val(
214     unival *scratch,
215     int current_cf,
216     rrd_value_t pdp_temp_val,
217     unsigned long elapsed_pdp_st,
218     unsigned long start_pdp_offset,
219     unsigned long pdp_cnt);
220
221 static void reset_cdp(
222     rrd_t *rrd,
223     unsigned long elapsed_pdp_st,
224     rrd_value_t *pdp_temp,
225     rrd_value_t *last_seasonal_coef,
226     rrd_value_t *seasonal_coef,
227     int rra_idx,
228     int ds_idx,
229     int cdp_idx,
230     enum cf_en current_cf);
231
232 static rrd_value_t initialize_average_carry_over(
233     rrd_value_t pdp_temp_val,
234     unsigned long elapsed_pdp_st,
235     unsigned long start_pdp_offset,
236     unsigned long pdp_cnt);
237
238 static rrd_value_t calculate_cdp_val(
239     rrd_value_t cdp_val,
240     rrd_value_t pdp_temp_val,
241     unsigned long elapsed_pdp_st,
242     int current_cf,
243     int i,
244     int ii);
245
246 static int update_aberrant_cdps(
247     rrd_t *rrd,
248     rrd_file_t *rrd_file,
249     unsigned long rra_begin,
250     unsigned long *rra_current,
251     unsigned long elapsed_pdp_st,
252     rrd_value_t *pdp_temp,
253     rrd_value_t **seasonal_coef);
254
255 static int write_to_rras(
256     rrd_t *rrd,
257     rrd_file_t *rrd_file,
258     unsigned long *rra_step_cnt,
259     unsigned long rra_begin,
260     unsigned long *rra_current,
261     time_t current_time,
262     unsigned long *skip_update,
263     info_t **pcdp_summary);
264
265 static int write_RRA_row(
266     rrd_file_t *rrd_file,
267     rrd_t *rrd,
268     unsigned long rra_idx,
269     unsigned long *rra_current,
270     unsigned short CDP_scratch_idx,
271     info_t **pcdp_summary,
272     time_t rra_time);
273
274 static int smooth_all_rras(
275     rrd_t *rrd,
276     rrd_file_t *rrd_file,
277     unsigned long rra_begin);
278
279 #ifndef HAVE_MMAP
280 static int write_changes_to_disk(
281     rrd_t *rrd,
282     rrd_file_t *rrd_file,
283     int version);
284 #endif
285
286 /*
287  * normalize time as returned by gettimeofday. usec part must
288  * be always >= 0
289  */
290 static inline void normalize_time(
291     struct timeval *t)
292 {
293     if (t->tv_usec < 0) {
294         t->tv_sec--;
295         t->tv_usec += 1e6L;
296     }
297 }
298
299 /*
300  * Sets current_time and current_time_usec based on the current time.
301  * current_time_usec is set to 0 if the version number is 1 or 2.
302  */
303 static inline void initialize_time(
304     time_t *current_time,
305     unsigned long *current_time_usec,
306     int version)
307 {
308     struct timeval tmp_time;    /* used for time conversion */
309
310     gettimeofday(&tmp_time, 0);
311     normalize_time(&tmp_time);
312     *current_time = tmp_time.tv_sec;
313     if (version >= 3) {
314         *current_time_usec = tmp_time.tv_usec;
315     } else {
316         *current_time_usec = 0;
317     }
318 }
319
320 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
321
322 info_t   *rrd_update_v(
323     int argc,
324     char **argv)
325 {
326     char     *tmplt = NULL;
327     info_t   *result = NULL;
328     infoval   rc;
329     struct option long_options[] = {
330         {"template", required_argument, 0, 't'},
331         {0, 0, 0, 0}
332     };
333
334     rc.u_int = -1;
335     optind = 0;
336     opterr = 0;         /* initialize getopt */
337
338     while (1) {
339         int       option_index = 0;
340         int       opt;
341
342         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
343
344         if (opt == EOF)
345             break;
346
347         switch (opt) {
348         case 't':
349             tmplt = optarg;
350             break;
351
352         case '?':
353             rrd_set_error("unknown option '%s'", argv[optind - 1]);
354             goto end_tag;
355         }
356     }
357
358     /* need at least 2 arguments: filename, data. */
359     if (argc - optind < 2) {
360         rrd_set_error("Not enough arguments");
361         goto end_tag;
362     }
363     rc.u_int = 0;
364     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
365     rc.u_int = _rrd_update(argv[optind], tmplt,
366                            argc - optind - 1,
367                            (const char **) (argv + optind + 1), result);
368     result->value.u_int = rc.u_int;
369   end_tag:
370     return result;
371 }
372
373 int rrd_update(
374     int argc,
375     char **argv)
376 {
377     struct option long_options[] = {
378         {"template", required_argument, 0, 't'},
379         {0, 0, 0, 0}
380     };
381     int       option_index = 0;
382     int       opt;
383     char     *tmplt = NULL;
384     int       rc = -1;
385
386     optind = 0;
387     opterr = 0;         /* initialize getopt */
388
389     while (1) {
390         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
391
392         if (opt == EOF)
393             break;
394
395         switch (opt) {
396         case 't':
397             tmplt = strdup(optarg);
398             break;
399
400         case '?':
401             rrd_set_error("unknown option '%s'", argv[optind - 1]);
402             goto out;
403         }
404     }
405
406     /* need at least 2 arguments: filename, data. */
407     if (argc - optind < 2) {
408         rrd_set_error("Not enough arguments");
409         goto out;
410     }
411
412     rc = rrd_update_r(argv[optind], tmplt,
413                       argc - optind - 1, (const char **) (argv + optind + 1));
414   out:
415     free(tmplt);
416     return rc;
417 }
418
419 int rrd_update_r(
420     const char *filename,
421     const char *tmplt,
422     int argc,
423     const char **argv)
424 {
425     return _rrd_update(filename, tmplt, argc, argv, NULL);
426 }
427
428 int _rrd_update(
429     const char *filename,
430     const char *tmplt,
431     int argc,
432     const char **argv,
433     info_t *pcdp_summary)
434 {
435
436     int       arg_i = 2;
437
438     unsigned long rra_begin;    /* byte pointer to the rra
439                                  * area in the rrd file.  this
440                                  * pointer never changes value */
441     unsigned long rra_current;  /* byte pointer to the current write
442                                  * spot in the rrd file. */
443     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
444                              * to the existing entry */
445     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
446                              * to the cdp values */
447
448     long     *tmpl_idx; /* index representing the settings
449                          * transported by the tmplt index */
450     unsigned long tmpl_cnt = 2; /* time and data */
451     rrd_t     rrd;
452     time_t    current_time = 0;
453     unsigned long current_time_usec = 0;    /* microseconds part of current time */
454     char    **updvals;
455     int       schedule_smooth = 0;
456
457     /* number of elapsed PDP steps since last update */
458     unsigned long *rra_step_cnt = NULL;
459
460     int       version;  /* rrd version */
461     rrd_file_t *rrd_file;
462     char     *arg_copy; /* for processing the argv */
463     unsigned long *skip_update; /* RRAs to advance but not write */
464
465     /* need at least 1 arguments: data. */
466     if (argc < 1) {
467         rrd_set_error("Not enough arguments");
468         goto err_out;
469     }
470
471     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
472         goto err_free;
473     }
474     /* We are now at the beginning of the rra's */
475     rra_current = rra_begin = rrd_file->header_len;
476
477     version = atoi(rrd.stat_head->version);
478
479     initialize_time(&current_time, &current_time_usec, version);
480
481     /* get exclusive lock to whole file.
482      * lock gets removed when we close the file.
483      */
484     if (LockRRD(rrd_file->fd) != 0) {
485         rrd_set_error("could not lock RRD");
486         goto err_close;
487     }
488
489     if (allocate_data_structures(&rrd, &updvals,
490                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
491                                  &rra_step_cnt, &skip_update,
492                                  &pdp_new) == -1) {
493         goto err_close;
494     }
495
496     /* loop through the arguments. */
497     for (arg_i = 0; arg_i < argc; arg_i++) {
498         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
499             rrd_set_error("failed duplication argv entry");
500             break;
501         }
502         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
503                         &current_time, &current_time_usec, pdp_temp, pdp_new,
504                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
505                         &pcdp_summary, version, skip_update,
506                         &schedule_smooth) == -1) {
507             free(arg_copy);
508             break;
509         }
510         free(arg_copy);
511     }
512
513     free(rra_step_cnt);
514
515     /* if we got here and if there is an error and if the file has not been
516      * written to, then close things up and return. */
517     if (rrd_test_error()) {
518         goto err_free_structures;
519     }
520 #ifndef HAVE_MMAP
521     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
522         goto err_free_structures;
523     }
524 #endif
525
526     /* calling the smoothing code here guarantees at most one smoothing
527      * operation per rrd_update call. Unfortunately, it is possible with bulk
528      * updates, or a long-delayed update for smoothing to occur off-schedule.
529      * This really isn't critical except during the burn-in cycles. */
530     if (schedule_smooth) {
531         smooth_all_rras(&rrd, rrd_file, rra_begin);
532     }
533
534 /*    rrd_dontneed(rrd_file,&rrd); */
535     rrd_free(&rrd);
536     rrd_close(rrd_file);
537
538     free(pdp_new);
539     free(tmpl_idx);
540     free(pdp_temp);
541     free(skip_update);
542     free(updvals);
543     return 0;
544
545   err_free_structures:
546     free(pdp_new);
547     free(tmpl_idx);
548     free(pdp_temp);
549     free(skip_update);
550     free(updvals);
551   err_close:
552     rrd_close(rrd_file);
553   err_free:
554     rrd_free(&rrd);
555   err_out:
556     return -1;
557 }
558
559 /*
560  * get exclusive lock to whole file.
561  * lock gets removed when we close the file
562  *
563  * returns 0 on success
564  */
565 int LockRRD(
566     int in_file)
567 {
568     int       rcstat;
569
570     {
571 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
572         struct _stat st;
573
574         if (_fstat(in_file, &st) == 0) {
575             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
576         } else {
577             rcstat = -1;
578         }
579 #else
580         struct flock lock;
581
582         lock.l_type = F_WRLCK;  /* exclusive write lock */
583         lock.l_len = 0; /* whole file */
584         lock.l_start = 0;   /* start of file */
585         lock.l_whence = SEEK_SET;   /* end of file */
586
587         rcstat = fcntl(in_file, F_SETLK, &lock);
588 #endif
589     }
590
591     return (rcstat);
592 }
593
594 /*
595  * Allocate some important arrays used, and initialize the template.
596  *
597  * When it returns, either all of the structures are allocated
598  * or none of them are.
599  *
600  * Returns 0 on success, -1 on error.
601  */
602 static int allocate_data_structures(
603     rrd_t *rrd,
604     char ***updvals,
605     rrd_value_t **pdp_temp,
606     const char *tmplt,
607     long **tmpl_idx,
608     unsigned long *tmpl_cnt,
609     unsigned long **rra_step_cnt,
610     unsigned long **skip_update,
611     rrd_value_t **pdp_new)
612 {
613     unsigned  i, ii;
614     if ((*updvals = (char **) malloc(sizeof(char *)
615                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
616         rrd_set_error("allocating updvals pointer array.");
617         return -1;
618     }
619     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
620                                             * rrd->stat_head->ds_cnt)) ==
621         NULL) {
622         rrd_set_error("allocating pdp_temp.");
623         goto err_free_updvals;
624     }
625     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
626                                                  *
627                                                  rrd->stat_head->rra_cnt)) ==
628         NULL) {
629         rrd_set_error("allocating skip_update.");
630         goto err_free_pdp_temp;
631     }
632     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
633                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
634         rrd_set_error("allocating tmpl_idx.");
635         goto err_free_skip_update;
636     }
637     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
638                                                   *
639                                                   (rrd->stat_head->
640                                                    rra_cnt))) == NULL) {
641         rrd_set_error("allocating rra_step_cnt.");
642         goto err_free_tmpl_idx;
643     }
644
645     /* initialize tmplt redirector */
646     /* default config example (assume DS 1 is a CDEF DS)
647        tmpl_idx[0] -> 0; (time)
648        tmpl_idx[1] -> 1; (DS 0)
649        tmpl_idx[2] -> 3; (DS 2)
650        tmpl_idx[3] -> 4; (DS 3) */
651     (*tmpl_idx)[0] = 0; /* time */
652     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
653         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
654             (*tmpl_idx)[ii++] = i;
655     }
656     *tmpl_cnt = ii;
657
658     if (tmplt != NULL) {
659         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
660             goto err_free_rra_step_cnt;
661         }
662     }
663
664     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
665                                            * rrd->stat_head->ds_cnt)) == NULL) {
666         rrd_set_error("allocating pdp_new.");
667         goto err_free_rra_step_cnt;
668     }
669
670     return 0;
671
672   err_free_rra_step_cnt:
673     free(*rra_step_cnt);
674   err_free_tmpl_idx:
675     free(*tmpl_idx);
676   err_free_skip_update:
677     free(*skip_update);
678   err_free_pdp_temp:
679     free(*pdp_temp);
680   err_free_updvals:
681     free(*updvals);
682     return -1;
683 }
684
685 /*
686  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
687  *
688  * Returns 0 on success.
689  */
690 static int parse_template(
691     rrd_t *rrd,
692     const char *tmplt,
693     unsigned long *tmpl_cnt,
694     long *tmpl_idx)
695 {
696     char     *dsname, *tmplt_copy;
697     unsigned int tmpl_len, i;
698     int       ret = 0;
699
700     *tmpl_cnt = 1;      /* the first entry is the time */
701
702     /* we should work on a writeable copy here */
703     if ((tmplt_copy = strdup(tmplt)) == NULL) {
704         rrd_set_error("error copying tmplt '%s'", tmplt);
705         ret = -1;
706         goto out;
707     }
708
709     dsname = tmplt_copy;
710     tmpl_len = strlen(tmplt_copy);
711     for (i = 0; i <= tmpl_len; i++) {
712         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
713             tmplt_copy[i] = '\0';
714             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
715                 rrd_set_error("tmplt contains more DS definitions than RRD");
716                 ret = -1;
717                 goto out_free_tmpl_copy;
718             }
719             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
720                 rrd_set_error("unknown DS name '%s'", dsname);
721                 ret = -1;
722                 goto out_free_tmpl_copy;
723             }
724             /* go to the next entry on the tmplt_copy */
725             if (i < tmpl_len)
726                 dsname = &tmplt_copy[i + 1];
727         }
728     }
729   out_free_tmpl_copy:
730     free(tmplt_copy);
731   out:
732     return ret;
733 }
734
735 /*
736  * Parse an update string, updates the primary data points (PDPs)
737  * and consolidated data points (CDPs), and writes changes to the RRAs.
738  *
739  * Returns 0 on success, -1 on error.
740  */
741 static int process_arg(
742     char *step_start,
743     rrd_t *rrd,
744     rrd_file_t *rrd_file,
745     unsigned long rra_begin,
746     unsigned long *rra_current,
747     time_t *current_time,
748     unsigned long *current_time_usec,
749     rrd_value_t *pdp_temp,
750     rrd_value_t *pdp_new,
751     unsigned long *rra_step_cnt,
752     char **updvals,
753     long *tmpl_idx,
754     unsigned long tmpl_cnt,
755     info_t **pcdp_summary,
756     int version,
757     unsigned long *skip_update,
758     int *schedule_smooth)
759 {
760     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
761
762     /* a vector of future Holt-Winters seasonal coefs */
763     unsigned long elapsed_pdp_st;
764
765     double    interval, pre_int, post_int;  /* interval between this and
766                                              * the last run */
767     unsigned long proc_pdp_cnt;
768     unsigned long rra_start;
769
770     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
771                  current_time, current_time_usec, version) == -1) {
772         return -1;
773     }
774     /* seek to the beginning of the rra's */
775     if (*rra_current != rra_begin) {
776 #ifndef HAVE_MMAP
777         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
778             rrd_set_error("seek error in rrd");
779             return -1;
780         }
781 #endif
782         *rra_current = rra_begin;
783     }
784     rra_start = rra_begin;
785
786     interval = (double) (*current_time - rrd->live_head->last_up)
787         + (double) ((long) *current_time_usec -
788                     (long) rrd->live_head->last_up_usec) / 1e6f;
789
790     /* process the data sources and update the pdp_prep 
791      * area accordingly */
792     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
793         return -1;
794     }
795
796     elapsed_pdp_st = calculate_elapsed_steps(rrd,
797                                              *current_time,
798                                              *current_time_usec, interval,
799                                              &pre_int, &post_int,
800                                              &proc_pdp_cnt);
801
802     /* has a pdp_st moment occurred since the last run ? */
803     if (elapsed_pdp_st == 0) {
804         /* no we have not passed a pdp_st moment. therefore update is simple */
805         simple_update(rrd, interval, pdp_new);
806     } else {
807         /* an pdp_st has occurred. */
808         if (process_all_pdp_st(rrd, interval,
809                                pre_int, post_int,
810                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
811             return -1;
812         }
813         if (update_all_cdp_prep(rrd, rra_step_cnt,
814                                 rra_begin, rrd_file,
815                                 elapsed_pdp_st,
816                                 proc_pdp_cnt,
817                                 &last_seasonal_coef,
818                                 &seasonal_coef,
819                                 pdp_temp, rra_current,
820                                 skip_update, schedule_smooth) == -1) {
821             goto err_free_coefficients;
822         }
823         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
824                                  elapsed_pdp_st, pdp_temp,
825                                  &seasonal_coef) == -1) {
826             goto err_free_coefficients;
827         }
828         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
829                           rra_current, *current_time, skip_update,
830                           pcdp_summary) == -1) {
831             goto err_free_coefficients;
832         }
833     }                   /* endif a pdp_st has occurred */
834     rrd->live_head->last_up = *current_time;
835     rrd->live_head->last_up_usec = *current_time_usec;
836
837     free(seasonal_coef);
838     free(last_seasonal_coef);
839     return 0;
840
841   err_free_coefficients:
842     free(seasonal_coef);
843     free(last_seasonal_coef);
844     return -1;
845 }
846
847 /*
848  * Parse a DS string (time + colon-separated values), storing the
849  * results in current_time, current_time_usec, and updvals.
850  *
851  * Returns 0 on success, -1 on error.
852  */
853 static int parse_ds(
854     rrd_t *rrd,
855     char **updvals,
856     long *tmpl_idx,
857     char *input,
858     unsigned long tmpl_cnt,
859     time_t *current_time,
860     unsigned long *current_time_usec,
861     int version)
862 {
863     char     *p;
864     unsigned long i;
865     char      timesyntax;
866
867     updvals[0] = input;
868     /* initialize all ds input to unknown except the first one
869        which has always got to be set */
870     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
871         updvals[i] = "U";
872
873     /* separate all ds elements; first must be examined separately
874        due to alternate time syntax */
875     if ((p = strchr(input, '@')) != NULL) {
876         timesyntax = '@';
877     } else if ((p = strchr(input, ':')) != NULL) {
878         timesyntax = ':';
879     } else {
880         rrd_set_error("expected timestamp not found in data source from %s",
881                       input);
882         return -1;
883     }
884     *p = '\0';
885     i = 1;
886     updvals[tmpl_idx[i++]] = p + 1;
887     while (*(++p)) {
888         if (*p == ':') {
889             *p = '\0';
890             if (i < tmpl_cnt) {
891                 updvals[tmpl_idx[i++]] = p + 1;
892             }
893         }
894     }
895
896     if (i != tmpl_cnt) {
897         rrd_set_error("expected %lu data source readings (got %lu) from %s",
898                       tmpl_cnt - 1, i, input);
899         return -1;
900     }
901
902     if (get_time_from_reading(rrd, timesyntax, updvals,
903                               current_time, current_time_usec,
904                               version) == -1) {
905         return -1;
906     }
907     return 0;
908 }
909
910 /*
911  * Parse the time in a DS string, store it in current_time and 
912  * current_time_usec and verify that it's later than the last
913  * update for this DS.
914  *
915  * Returns 0 on success, -1 on error.
916  */
917 static int get_time_from_reading(
918     rrd_t *rrd,
919     char timesyntax,
920     char **updvals,
921     time_t *current_time,
922     unsigned long *current_time_usec,
923     int version)
924 {
925     double    tmp;
926     char     *parsetime_error = NULL;
927     char     *old_locale;
928     struct rrd_time_value ds_tv;
929     struct timeval tmp_time;    /* used for time conversion */
930
931     /* get the time from the reading ... handle N */
932     if (timesyntax == '@') {    /* at-style */
933         if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
934             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
935             return -1;
936         }
937         if (ds_tv.type == RELATIVE_TO_END_TIME ||
938             ds_tv.type == RELATIVE_TO_START_TIME) {
939             rrd_set_error("specifying time relative to the 'start' "
940                           "or 'end' makes no sense here: %s", updvals[0]);
941             return -1;
942         }
943         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
944         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
945     } else if (strcmp(updvals[0], "N") == 0) {
946         gettimeofday(&tmp_time, 0);
947         normalize_time(&tmp_time);
948         *current_time = tmp_time.tv_sec;
949         *current_time_usec = tmp_time.tv_usec;
950     } else {
951         old_locale = setlocale(LC_NUMERIC, "C");
952         tmp = strtod(updvals[0], 0);
953         setlocale(LC_NUMERIC, old_locale);
954         *current_time = floor(tmp);
955         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
956     }
957     /* dont do any correction for old version RRDs */
958     if (version < 3)
959         *current_time_usec = 0;
960
961     if (*current_time < rrd->live_head->last_up ||
962         (*current_time == rrd->live_head->last_up &&
963          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
964         rrd_set_error("illegal attempt to update using time %ld when "
965                       "last update time is %ld (minimum one second step)",
966                       *current_time, rrd->live_head->last_up);
967         return -1;
968     }
969     return 0;
970 }
971
972 /*
973  * Update pdp_new by interpreting the updvals according to the DS type
974  * (COUNTER, GAUGE, etc.).
975  *
976  * Returns 0 on success, -1 on error.
977  */
978 static int update_pdp_prep(
979     rrd_t *rrd,
980     char **updvals,
981     rrd_value_t *pdp_new,
982     double interval)
983 {
984     unsigned long ds_idx;
985     int       ii;
986     char     *endptr;   /* used in the conversion */
987     double    rate;
988     char     *old_locale;
989     enum dst_en dst_idx;
990
991     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
992         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
993
994         /* make sure we do not build diffs with old last_ds values */
995         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
996             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
997             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
998         }
999
1000         /* NOTE: DST_CDEF should never enter this if block, because
1001          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1002          * accidently specified a value for the DST_CDEF. To handle this case,
1003          * an extra check is required. */
1004
1005         if ((updvals[ds_idx + 1][0] != 'U') &&
1006             (dst_idx != DST_CDEF) &&
1007             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1008             rate = DNAN;
1009
1010             /* pdp_new contains rate * time ... eg the bytes transferred during
1011              * the interval. Doing it this way saves a lot of math operations
1012              */
1013             switch (dst_idx) {
1014             case DST_COUNTER:
1015             case DST_DERIVE:
1016                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1017                     if ((updvals[ds_idx + 1][ii] < '0'
1018                          || updvals[ds_idx + 1][ii] > '9')
1019                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1020                         rrd_set_error("not a simple integer: '%s'",
1021                                       updvals[ds_idx + 1]);
1022                         return -1;
1023                     }
1024                 }
1025                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1026                     pdp_new[ds_idx] =
1027                         rrd_diff(updvals[ds_idx + 1],
1028                                  rrd->pdp_prep[ds_idx].last_ds);
1029                     if (dst_idx == DST_COUNTER) {
1030                         /* simple overflow catcher. This will fail
1031                          * terribly for non 32 or 64 bit counters
1032                          * ... are there any others in SNMP land?
1033                          */
1034                         if (pdp_new[ds_idx] < (double) 0.0)
1035                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1036                         if (pdp_new[ds_idx] < (double) 0.0)
1037                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1038                     }
1039                     rate = pdp_new[ds_idx] / interval;
1040                 } else {
1041                     pdp_new[ds_idx] = DNAN;
1042                 }
1043                 break;
1044             case DST_ABSOLUTE:
1045                 old_locale = setlocale(LC_NUMERIC, "C");
1046                 errno = 0;
1047                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1048                 setlocale(LC_NUMERIC, old_locale);
1049                 if (errno > 0) {
1050                     rrd_set_error("converting '%s' to float: %s",
1051                                   updvals[ds_idx + 1], rrd_strerror(errno));
1052                     return -1;
1053                 };
1054                 if (endptr[0] != '\0') {
1055                     rrd_set_error
1056                         ("conversion of '%s' to float not complete: tail '%s'",
1057                          updvals[ds_idx + 1], endptr);
1058                     return -1;
1059                 }
1060                 rate = pdp_new[ds_idx] / interval;
1061                 break;
1062             case DST_GAUGE:
1063                 errno = 0;
1064                 old_locale = setlocale(LC_NUMERIC, "C");
1065                 pdp_new[ds_idx] =
1066                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1067                 setlocale(LC_NUMERIC, old_locale);
1068                 if (errno) {
1069                     rrd_set_error("converting '%s' to float: %s",
1070                                   updvals[ds_idx + 1], rrd_strerror(errno));
1071                     return -1;
1072                 };
1073                 if (endptr[0] != '\0') {
1074                     rrd_set_error
1075                         ("conversion of '%s' to float not complete: tail '%s'",
1076                          updvals[ds_idx + 1], endptr);
1077                     return -1;
1078                 }
1079                 rate = pdp_new[ds_idx] / interval;
1080                 break;
1081             default:
1082                 rrd_set_error("rrd contains unknown DS type : '%s'",
1083                               rrd->ds_def[ds_idx].dst);
1084                 return -1;
1085             }
1086             /* break out of this for loop if the error string is set */
1087             if (rrd_test_error()) {
1088                 return -1;
1089             }
1090             /* make sure pdp_temp is neither too large or too small
1091              * if any of these occur it becomes unknown ...
1092              * sorry folks ... */
1093             if (!isnan(rate) &&
1094                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1095                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1096                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1097                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1098                 pdp_new[ds_idx] = DNAN;
1099             }
1100         } else {
1101             /* no news is news all the same */
1102             pdp_new[ds_idx] = DNAN;
1103         }
1104
1105
1106         /* make a copy of the command line argument for the next run */
1107 #ifdef DEBUG
1108         fprintf(stderr, "prep ds[%lu]\t"
1109                 "last_arg '%s'\t"
1110                 "this_arg '%s'\t"
1111                 "pdp_new %10.2f\n",
1112                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1113                 pdp_new[ds_idx]);
1114 #endif
1115         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1116                 LAST_DS_LEN - 1);
1117         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1118     }
1119     return 0;
1120 }
1121
1122 /*
1123  * How many PDP steps have elapsed since the last update? Returns the answer,
1124  * and stores the time between the last update and the last PDP in pre_time,
1125  * and the time between the last PDP and the current time in post_int.
1126  */
1127 static int calculate_elapsed_steps(
1128     rrd_t *rrd,
1129     unsigned long current_time,
1130     unsigned long current_time_usec,
1131     double interval,
1132     double *pre_int,
1133     double *post_int,
1134     unsigned long *proc_pdp_cnt)
1135 {
1136     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1137     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1138                                  * time */
1139     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1140                                  * when it was last updated */
1141     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1142
1143     /* when was the current pdp started */
1144     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1145     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1146
1147     /* when did the last pdp_st occur */
1148     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1149     occu_pdp_st = current_time - occu_pdp_age;
1150
1151     if (occu_pdp_st > proc_pdp_st) {
1152         /* OK we passed the pdp_st moment */
1153         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1154                                                                      * occurred before the latest
1155                                                                      * pdp_st moment*/
1156         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1157         *post_int = occu_pdp_age;   /* how much after it */
1158         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1159     } else {
1160         *pre_int = interval;
1161         *post_int = 0;
1162     }
1163
1164     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1165
1166 #ifdef DEBUG
1167     printf("proc_pdp_age %lu\t"
1168            "proc_pdp_st %lu\t"
1169            "occu_pfp_age %lu\t"
1170            "occu_pdp_st %lu\t"
1171            "int %lf\t"
1172            "pre_int %lf\t"
1173            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1174            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1175 #endif
1176
1177     /* compute the number of elapsed pdp_st moments */
1178     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1179 }
1180
1181 /*
1182  * Increment the PDP values by the values in pdp_new, or else initialize them.
1183  */
1184 static void simple_update(
1185     rrd_t *rrd,
1186     double interval,
1187     rrd_value_t *pdp_new)
1188 {
1189     int       i;
1190
1191     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1192         if (isnan(pdp_new[i])) {
1193             /* this is not really accurate if we use subsecond data arrival time
1194                should have thought of it when going subsecond resolution ...
1195                sorry next format change we will have it! */
1196             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1197                 floor(interval);
1198         } else {
1199             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1200                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1201             } else {
1202                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1203             }
1204         }
1205 #ifdef DEBUG
1206         fprintf(stderr,
1207                 "NO PDP  ds[%i]\t"
1208                 "value %10.2f\t"
1209                 "unkn_sec %5lu\n",
1210                 i,
1211                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1212                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1213 #endif
1214     }
1215 }
1216
1217 /*
1218  * Call process_pdp_st for each DS.
1219  *
1220  * Returns 0 on success, -1 on error.
1221  */
1222 static int process_all_pdp_st(
1223     rrd_t *rrd,
1224     double interval,
1225     double pre_int,
1226     double post_int,
1227     unsigned long elapsed_pdp_st,
1228     rrd_value_t *pdp_new,
1229     rrd_value_t *pdp_temp)
1230 {
1231     unsigned long ds_idx;
1232
1233     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1234        rate*seconds which occurred up to the last run.
1235        pdp_new[] contains rate*seconds from the latest run.
1236        pdp_temp[] will contain the rate for cdp */
1237
1238     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1239         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1240                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1241                            pdp_new, pdp_temp) == -1) {
1242             return -1;
1243         }
1244 #ifdef DEBUG
1245         fprintf(stderr, "PDP UPD ds[%lu]\t"
1246                 "elapsed_pdp_st %lu\t"
1247                 "pdp_temp %10.2f\t"
1248                 "new_prep %10.2f\t"
1249                 "new_unkn_sec %5lu\n",
1250                 ds_idx,
1251                 elapsed_pdp_st,
1252                 pdp_temp[ds_idx],
1253                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1254                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1255 #endif
1256     }
1257     return 0;
1258 }
1259
1260 /*
1261  * Process an update that occurs after one of the PDP moments.
1262  * Increments the PDP value, sets NAN if time greater than the
1263  * heartbeats have elapsed, processes CDEFs.
1264  *
1265  * Returns 0 on success, -1 on error.
1266  */
1267 static int process_pdp_st(
1268     rrd_t *rrd,
1269     unsigned long ds_idx,
1270     double interval,
1271     double pre_int,
1272     double post_int,
1273     long diff_pdp_st, /* number of seconds in full steps passed since last update */
1274     rrd_value_t *pdp_new,
1275     rrd_value_t *pdp_temp)
1276 {
1277     int       i;
1278
1279     /* update pdp_prep to the current pdp_st. */
1280     double    pre_unknown = 0.0;
1281     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1282     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1283
1284     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1285
1286     rpnstack_init(&rpnstack);
1287
1288
1289     if (isnan(pdp_new[ds_idx])) {
1290         /* a final bit of unknown to be added before calculation
1291            we use a temporary variable for this so that we
1292            don't have to turn integer lines before using the value */
1293         pre_unknown = pre_int;
1294     } else {
1295         if (isnan(scratch[PDP_val].u_val)) {
1296             scratch[PDP_val].u_val = 0;
1297         }
1298         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1299     }
1300
1301     /* if too much of the pdp_prep is unknown we dump it */
1302     /* if the interval is larger thatn mrhb we get NAN */
1303     if ((interval > mrhb) ||
1304         (rrd->stat_head->pdp_step/2.0 < (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1305         pdp_temp[ds_idx] = DNAN;
1306     } else {
1307         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1308             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1309              pre_unknown);
1310     }
1311
1312     /* process CDEF data sources; remember each CDEF DS can
1313      * only reference other DS with a lower index number */
1314     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1315         rpnp_t   *rpnp;
1316
1317         rpnp =
1318             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1319         /* substitute data values for OP_VARIABLE nodes */
1320         for (i = 0; rpnp[i].op != OP_END; i++) {
1321             if (rpnp[i].op == OP_VARIABLE) {
1322                 rpnp[i].op = OP_NUMBER;
1323                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1324             }
1325         }
1326         /* run the rpn calculator */
1327         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1328             free(rpnp);
1329             rpnstack_free(&rpnstack);
1330             return -1;
1331         }
1332     }
1333
1334     /* make pdp_prep ready for the next run */
1335     if (isnan(pdp_new[ds_idx])) {
1336         /* this is not realy accurate if we use subsecond data arival time
1337            should have thought of it when going subsecond resolution ...
1338            sorry next format change we will have it! */
1339         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1340         scratch[PDP_val].u_val = DNAN;
1341     } else {
1342         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1343         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1344     }
1345     rpnstack_free(&rpnstack);
1346     return 0;
1347 }
1348
1349 /*
1350  * Iterate over all the RRAs for a given DS and:
1351  * 1. Decide whether to schedule a smooth later
1352  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1353  * 3. Update the CDP
1354  *
1355  * Returns 0 on success, -1 on error
1356  */
1357 static int update_all_cdp_prep(
1358     rrd_t *rrd,
1359     unsigned long *rra_step_cnt,
1360     unsigned long rra_begin,
1361     rrd_file_t *rrd_file,
1362     unsigned long elapsed_pdp_st,
1363     unsigned long proc_pdp_cnt,
1364     rrd_value_t **last_seasonal_coef,
1365     rrd_value_t **seasonal_coef,
1366     rrd_value_t *pdp_temp,
1367     unsigned long *rra_current,
1368     unsigned long *skip_update,
1369     int *schedule_smooth)
1370 {
1371     unsigned long rra_idx;
1372
1373     /* index into the CDP scratch array */
1374     enum cf_en current_cf;
1375     unsigned long rra_start;
1376
1377     /* number of rows to be updated in an RRA for a data value. */
1378     unsigned long start_pdp_offset;
1379
1380     rra_start = rra_begin;
1381     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1382         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1383         start_pdp_offset =
1384             rrd->rra_def[rra_idx].pdp_cnt -
1385             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1386         skip_update[rra_idx] = 0;
1387         if (start_pdp_offset <= elapsed_pdp_st) {
1388             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1389                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1390         } else {
1391             rra_step_cnt[rra_idx] = 0;
1392         }
1393
1394         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1395             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1396              * so that they will be correct for the next observed value; note that for
1397              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1398              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1399             if (rra_step_cnt[rra_idx] > 1) {
1400                 skip_update[rra_idx] = 1;
1401                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1402                                 elapsed_pdp_st, last_seasonal_coef);
1403                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1404                                 elapsed_pdp_st + 1, seasonal_coef);
1405             }
1406             /* periodically run a smoother for seasonal effects */
1407             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1408 #ifdef DEBUG
1409                 fprintf(stderr,
1410                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1411                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1412                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1413                         u_cnt);
1414 #endif
1415                 *schedule_smooth = 1;
1416             }
1417             *rra_current = rrd_tell(rrd_file);
1418         }
1419         if (rrd_test_error())
1420             return -1;
1421
1422         if (update_cdp_prep
1423             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1424              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1425              current_cf) == -1) {
1426             return -1;
1427         }
1428         rra_start +=
1429             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1430             sizeof(rrd_value_t);
1431     }
1432     return 0;
1433 }
1434
1435 /* 
1436  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1437  */
1438 static int do_schedule_smooth(
1439     rrd_t *rrd,
1440     unsigned long rra_idx,
1441     unsigned long elapsed_pdp_st)
1442 {
1443     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1444     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1445     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1446     unsigned long seasonal_smooth_idx =
1447         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1448     unsigned long *init_seasonal =
1449         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1450
1451     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1452      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1453      * really an RRA level, not a data source within RRA level parameter, but
1454      * the rra_def is read only for rrd_update (not flushed to disk). */
1455     if (*init_seasonal > BURNIN_CYCLES) {
1456         /* someone has no doubt invented a trick to deal with this wrap around,
1457          * but at least this code is clear. */
1458         if (seasonal_smooth_idx > cur_row) {
1459             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1460              * between PDP and CDP */
1461             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1462         }
1463         /* can't rely on negative numbers because we are working with
1464          * unsigned values */
1465         return (cur_row + elapsed_pdp_st >= row_cnt
1466                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1467     }
1468     /* mark off one of the burn-in cycles */
1469     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1470 }
1471
1472 /*
1473  * For a given RRA, iterate over the data sources and call the appropriate
1474  * consolidation function.
1475  *
1476  * Returns 0 on success, -1 on error.
1477  */
1478 static int update_cdp_prep(
1479     rrd_t *rrd,
1480     unsigned long elapsed_pdp_st,
1481     unsigned long start_pdp_offset,
1482     unsigned long *rra_step_cnt,
1483     int rra_idx,
1484     rrd_value_t *pdp_temp,
1485     rrd_value_t *last_seasonal_coef,
1486     rrd_value_t *seasonal_coef,
1487     int current_cf)
1488 {
1489     unsigned long ds_idx, cdp_idx;
1490
1491     /* update CDP_PREP areas */
1492     /* loop over data soures within each RRA */
1493     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1494
1495         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1496
1497         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1498             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1499                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1500                        elapsed_pdp_st, start_pdp_offset,
1501                        rrd->rra_def[rra_idx].pdp_cnt,
1502                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1503                        rra_idx, ds_idx);
1504         } else {
1505             /* Nothing to consolidate if there's one PDP per CDP. However, if
1506              * we've missed some PDPs, let's update null counters etc. */
1507             if (elapsed_pdp_st > 2) {
1508                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1509                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1510                           current_cf);
1511             }
1512         }
1513
1514         if (rrd_test_error())
1515             return -1;
1516     }                   /* endif data sources loop */
1517     return 0;
1518 }
1519
1520 /*
1521  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1522  * primary value, secondary value, and # of unknowns.
1523  */
1524 static void update_cdp(
1525     unival *scratch,
1526     int current_cf,
1527     rrd_value_t pdp_temp_val,
1528     unsigned long rra_step_cnt,
1529     unsigned long elapsed_pdp_st,
1530     unsigned long start_pdp_offset,
1531     unsigned long pdp_cnt,
1532     rrd_value_t xff,
1533     int i,
1534     int ii)
1535 {
1536     /* shorthand variables */
1537     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1538     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1539     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1540     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1541
1542     if (rra_step_cnt) {
1543         /* If we are in this block, as least 1 CDP value will be written to
1544          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1545          * to be written, then the "fill in" value is the CDP_secondary_val
1546          * entry. */
1547         if (isnan(pdp_temp_val)) {
1548             *cdp_unkn_pdp_cnt += start_pdp_offset;
1549             *cdp_secondary_val = DNAN;
1550         } else {
1551             /* CDP_secondary value is the RRA "fill in" value for intermediary
1552              * CDP data entries. No matter the CF, the value is the same because
1553              * the average, max, min, and last of a list of identical values is
1554              * the same, namely, the value itself. */
1555             *cdp_secondary_val = pdp_temp_val;
1556         }
1557
1558         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1559             *cdp_primary_val = DNAN;
1560             if (current_cf == CF_AVERAGE) {
1561                 *cdp_val =
1562                     initialize_average_carry_over(pdp_temp_val,
1563                                                   elapsed_pdp_st,
1564                                                   start_pdp_offset, pdp_cnt);
1565             } else {
1566                 *cdp_val = pdp_temp_val;
1567             }
1568         } else {
1569             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1570                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1571         }               /* endif meets xff value requirement for a valid value */
1572         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1573          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1574         if (isnan(pdp_temp_val))
1575             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1576         else
1577             *cdp_unkn_pdp_cnt = 0;
1578     } else {            /* rra_step_cnt[i]  == 0 */
1579
1580 #ifdef DEBUG
1581         if (isnan(*cdp_val)) {
1582             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1583                     i, ii);
1584         } else {
1585             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1586                     i, ii, *cdp_val);
1587         }
1588 #endif
1589         if (isnan(pdp_temp_val)) {
1590             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1591         } else {
1592             *cdp_val =
1593                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1594                                   current_cf, i, ii);
1595         }
1596     }
1597 }
1598
1599 /*
1600  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1601  * on the type of consolidation function.
1602  */
1603 static void initialize_cdp_val(
1604     unival *scratch,
1605     int current_cf,
1606     rrd_value_t pdp_temp_val,
1607     unsigned long elapsed_pdp_st,
1608     unsigned long start_pdp_offset,
1609     unsigned long pdp_cnt)
1610 {
1611     rrd_value_t cum_val, cur_val;
1612
1613     switch (current_cf) {
1614     case CF_AVERAGE:
1615         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1616         cur_val = IFDNAN(pdp_temp_val, 0.0);
1617         scratch[CDP_primary_val].u_val =
1618             (cum_val + cur_val * start_pdp_offset) /
1619             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1620         scratch[CDP_val].u_val =
1621             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1622                                           start_pdp_offset, pdp_cnt);
1623         break;
1624     case CF_MAXIMUM:
1625         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1626         cur_val = IFDNAN(pdp_temp_val, -DINF);
1627 #if 0
1628 #ifdef DEBUG
1629         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1630             fprintf(stderr,
1631                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1632                     i, ii);
1633             exit(-1);
1634         }
1635 #endif
1636 #endif
1637         if (cur_val > cum_val)
1638             scratch[CDP_primary_val].u_val = cur_val;
1639         else
1640             scratch[CDP_primary_val].u_val = cum_val;
1641         /* initialize carry over value */
1642         scratch[CDP_val].u_val = pdp_temp_val;
1643         break;
1644     case CF_MINIMUM:
1645         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1646         cur_val = IFDNAN(pdp_temp_val, DINF);
1647 #if 0
1648 #ifdef DEBUG
1649         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1650             fprintf(stderr,
1651                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1652                     ii);
1653             exit(-1);
1654         }
1655 #endif
1656 #endif
1657         if (cur_val < cum_val)
1658             scratch[CDP_primary_val].u_val = cur_val;
1659         else
1660             scratch[CDP_primary_val].u_val = cum_val;
1661         /* initialize carry over value */
1662         scratch[CDP_val].u_val = pdp_temp_val;
1663         break;
1664     case CF_LAST:
1665     default:
1666         scratch[CDP_primary_val].u_val = pdp_temp_val;
1667         /* initialize carry over value */
1668         scratch[CDP_val].u_val = pdp_temp_val;
1669         break;
1670     }
1671 }
1672
1673 /*
1674  * Update the consolidation function for Holt-Winters functions as
1675  * well as other functions that don't actually consolidate multiple
1676  * PDPs.
1677  */
1678 static void reset_cdp(
1679     rrd_t *rrd,
1680     unsigned long elapsed_pdp_st,
1681     rrd_value_t *pdp_temp,
1682     rrd_value_t *last_seasonal_coef,
1683     rrd_value_t *seasonal_coef,
1684     int rra_idx,
1685     int ds_idx,
1686     int cdp_idx,
1687     enum cf_en current_cf)
1688 {
1689     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1690
1691     switch (current_cf) {
1692     case CF_AVERAGE:
1693     default:
1694         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1695         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1696         break;
1697     case CF_SEASONAL:
1698     case CF_DEVSEASONAL:
1699         /* need to update cached seasonal values, so they are consistent
1700          * with the bulk update */
1701         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1702          * CDP_last_deviation are the same. */
1703         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1704         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1705         break;
1706     case CF_HWPREDICT:
1707     case CF_MHWPREDICT:
1708         /* need to update the null_count and last_null_count.
1709          * even do this for non-DNAN pdp_temp because the
1710          * algorithm is not learning from batch updates. */
1711         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1712         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1713         /* fall through */
1714     case CF_DEVPREDICT:
1715         scratch[CDP_primary_val].u_val = DNAN;
1716         scratch[CDP_secondary_val].u_val = DNAN;
1717         break;
1718     case CF_FAILURES:
1719         /* do not count missed bulk values as failures */
1720         scratch[CDP_primary_val].u_val = 0;
1721         scratch[CDP_secondary_val].u_val = 0;
1722         /* need to reset violations buffer.
1723          * could do this more carefully, but for now, just
1724          * assume a bulk update wipes away all violations. */
1725         erase_violations(rrd, cdp_idx, rra_idx);
1726         break;
1727     }
1728 }
1729
1730 static rrd_value_t initialize_average_carry_over(
1731     rrd_value_t pdp_temp_val,
1732     unsigned long elapsed_pdp_st,
1733     unsigned long start_pdp_offset,
1734     unsigned long pdp_cnt)
1735 {
1736     /* initialize carry over value */
1737     if (isnan(pdp_temp_val)) {
1738         return DNAN;
1739     }
1740     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1741 }
1742
1743 /*
1744  * Update or initialize a CDP value based on the consolidation
1745  * function.
1746  *
1747  * Returns the new value.
1748  */
1749 static rrd_value_t calculate_cdp_val(
1750     rrd_value_t cdp_val,
1751     rrd_value_t pdp_temp_val,
1752     unsigned long elapsed_pdp_st,
1753     int current_cf,
1754 #ifdef DEBUG
1755     int i,
1756     int ii
1757 #else
1758     int UNUSED(i),
1759     int UNUSED(ii)
1760 #endif
1761     )
1762 {
1763     if (isnan(cdp_val)) {
1764         if (current_cf == CF_AVERAGE) {
1765             pdp_temp_val *= elapsed_pdp_st;
1766         }
1767 #ifdef DEBUG
1768         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1769                 i, ii, pdp_temp_val);
1770 #endif
1771         return pdp_temp_val;
1772     }
1773     if (current_cf == CF_AVERAGE)
1774         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1775     if (current_cf == CF_MINIMUM)
1776         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1777     if (current_cf == CF_MAXIMUM)
1778         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1779
1780     return pdp_temp_val;
1781 }
1782
1783 /*
1784  * For each RRA, update the seasonal values and then call update_aberrant_CF
1785  * for each data source.
1786  *
1787  * Return 0 on success, -1 on error.
1788  */
1789 static int update_aberrant_cdps(
1790     rrd_t *rrd,
1791     rrd_file_t *rrd_file,
1792     unsigned long rra_begin,
1793     unsigned long *rra_current,
1794     unsigned long elapsed_pdp_st,
1795     rrd_value_t *pdp_temp,
1796     rrd_value_t **seasonal_coef)
1797 {
1798     unsigned long rra_idx, ds_idx, j;
1799
1800     /* number of PDP steps since the last update that
1801      * are assigned to the first CDP to be generated
1802      * since the last update. */
1803     unsigned short scratch_idx;
1804     unsigned long rra_start;
1805     enum cf_en current_cf;
1806
1807     /* this loop is only entered if elapsed_pdp_st < 3 */
1808     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1809          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1810         rra_start = rra_begin;
1811         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1812             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1813                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1814                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1815                     if (scratch_idx == CDP_primary_val) {
1816                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1817                                         elapsed_pdp_st + 1, seasonal_coef);
1818                     } else {
1819                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1820                                         elapsed_pdp_st + 2, seasonal_coef);
1821                     }
1822                     *rra_current = rrd_tell(rrd_file);
1823                 }
1824                 if (rrd_test_error())
1825                     return -1;
1826                 /* loop over data soures within each RRA */
1827                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1828                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1829                                        rra_idx * (rrd->stat_head->ds_cnt) +
1830                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1831                                        *seasonal_coef);
1832                 }
1833             }
1834             rra_start += rrd->rra_def[rra_idx].row_cnt
1835                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1836         }
1837     }
1838     return 0;
1839 }
1840
1841 /* 
1842  * Move sequentially through the file, writing one RRA at a time.  Note this
1843  * architecture divorces the computation of CDP with flushing updated RRA
1844  * entries to disk.
1845  *
1846  * Return 0 on success, -1 on error.
1847  */
1848 static int write_to_rras(
1849     rrd_t *rrd,
1850     rrd_file_t *rrd_file,
1851     unsigned long *rra_step_cnt,
1852     unsigned long rra_begin,
1853     unsigned long *rra_current,
1854     time_t current_time,
1855     unsigned long *skip_update,
1856     info_t **pcdp_summary)
1857 {
1858     unsigned long rra_idx;
1859     unsigned long rra_start;
1860     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1861     time_t    rra_time = 0; /* time of update for a RRA */
1862
1863     /* Ready to write to disk */
1864     rra_start = rra_begin;
1865     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1866         /* skip unless there's something to write */
1867         if (rra_step_cnt[rra_idx]) {
1868             /* write the first row */
1869 #ifdef DEBUG
1870             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1871 #endif
1872             rrd->rra_ptr[rra_idx].cur_row++;
1873             if (rrd->rra_ptr[rra_idx].cur_row >=
1874                 rrd->rra_def[rra_idx].row_cnt)
1875                 rrd->rra_ptr[rra_idx].cur_row = 0;  /* wrap around */
1876             /* position on the first row */
1877             rra_pos_tmp = rra_start +
1878                 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1879                 sizeof(rrd_value_t);
1880             if (rra_pos_tmp != *rra_current) {
1881                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1882                     rrd_set_error("seek error in rrd");
1883                     return -1;
1884                 }
1885                 *rra_current = rra_pos_tmp;
1886             }
1887 #ifdef DEBUG
1888             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1889 #endif
1890             if (!skip_update[rra_idx]) {
1891                 if (*pcdp_summary != NULL) {
1892                     rra_time = (current_time - current_time
1893                                 % (rrd->rra_def[rra_idx].pdp_cnt *
1894                                    rrd->stat_head->pdp_step))
1895                         -
1896                         ((rra_step_cnt[rra_idx] -
1897                           1) * rrd->rra_def[rra_idx].pdp_cnt *
1898                          rrd->stat_head->pdp_step);
1899                 }
1900                 if (write_RRA_row
1901                     (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1902                      pcdp_summary, rra_time) == -1)
1903                     return -1;
1904             }
1905
1906             /* write other rows of the bulk update, if any */
1907             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1908                 if (++rrd->rra_ptr[rra_idx].cur_row ==
1909                     rrd->rra_def[rra_idx].row_cnt) {
1910 #ifdef DEBUG
1911                     fprintf(stderr,
1912                             "Wraparound for RRA %s, %lu updates left\n",
1913                             rrd->rra_def[rra_idx].cf_nam,
1914                             rra_step_cnt[rra_idx] - 1);
1915 #endif
1916                     /* wrap */
1917                     rrd->rra_ptr[rra_idx].cur_row = 0;
1918                     /* seek back to beginning of current rra */
1919                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1920                         rrd_set_error("seek error in rrd");
1921                         return -1;
1922                     }
1923 #ifdef DEBUG
1924                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1925                             rrd_file->pos);
1926 #endif
1927                     *rra_current = rra_start;
1928                 }
1929                 if (!skip_update[rra_idx]) {
1930                     if (*pcdp_summary != NULL) {
1931                         rra_time = (current_time - current_time
1932                                     % (rrd->rra_def[rra_idx].pdp_cnt *
1933                                        rrd->stat_head->pdp_step))
1934                             -
1935                             ((rra_step_cnt[rra_idx] -
1936                               2) * rrd->rra_def[rra_idx].pdp_cnt *
1937                              rrd->stat_head->pdp_step);
1938                     }
1939                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1940                                       CDP_secondary_val, pcdp_summary,
1941                                       rra_time) == -1)
1942                         return -1;
1943                 }
1944             }
1945         }
1946         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1947             sizeof(rrd_value_t);
1948     }                   /* RRA LOOP */
1949
1950     return 0;
1951 }
1952
1953 /*
1954  * Write out one row of values (one value per DS) to the archive.
1955  *
1956  * Returns 0 on success, -1 on error.
1957  */
1958 static int write_RRA_row(
1959     rrd_file_t *rrd_file,
1960     rrd_t *rrd,
1961     unsigned long rra_idx,
1962     unsigned long *rra_current,
1963     unsigned short CDP_scratch_idx,
1964     info_t **pcdp_summary,
1965     time_t rra_time)
1966 {
1967     unsigned long ds_idx, cdp_idx;
1968     infoval   iv;
1969
1970     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1971         /* compute the cdp index */
1972         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1973 #ifdef DEBUG
1974         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1975                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1976                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1977 #endif
1978         if (*pcdp_summary != NULL) {
1979             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1980             /* append info to the return hash */
1981             *pcdp_summary = info_push(*pcdp_summary,
1982                                       sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1983                                                     rra_time,
1984                                                     rrd->rra_def[rra_idx].
1985                                                     cf_nam,
1986                                                     rrd->rra_def[rra_idx].
1987                                                     pdp_cnt,
1988                                                     rrd->ds_def[ds_idx].
1989                                                     ds_nam), RD_I_VAL, iv);
1990         }
1991         if (rrd_write(rrd_file,
1992                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1993                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1994             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1995             return -1;
1996         }
1997         *rra_current += sizeof(rrd_value_t);
1998     }
1999     return 0;
2000 }
2001
2002 /*
2003  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2004  *
2005  * Returns 0 on success, -1 otherwise
2006  */
2007 static int smooth_all_rras(
2008     rrd_t *rrd,
2009     rrd_file_t *rrd_file,
2010     unsigned long rra_begin)
2011 {
2012     unsigned long rra_start = rra_begin;
2013     unsigned long rra_idx;
2014
2015     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2016         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2017             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2018 #ifdef DEBUG
2019             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2020 #endif
2021             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2022             if (rrd_test_error())
2023                 return -1;
2024         }
2025         rra_start += rrd->rra_def[rra_idx].row_cnt
2026             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2027     }
2028     return 0;
2029 }
2030
2031 #ifndef HAVE_MMAP
2032 /*
2033  * Flush changes to disk (unless we're using mmap)
2034  *
2035  * Returns 0 on success, -1 otherwise
2036  */
2037 static int write_changes_to_disk(
2038     rrd_t *rrd,
2039     rrd_file_t *rrd_file,
2040     int version)
2041 {
2042     /* we just need to write back the live header portion now */
2043     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2044                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2045                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2046                  SEEK_SET) != 0) {
2047         rrd_set_error("seek rrd for live header writeback");
2048         return -1;
2049     }
2050     if (version >= 3) {
2051         if (rrd_write(rrd_file, rrd->live_head,
2052                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2053             rrd_set_error("rrd_write live_head to rrd");
2054             return -1;
2055         }
2056     } else {
2057         if (rrd_write(rrd_file, &rrd->live_head->last_up,
2058                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2059             rrd_set_error("rrd_write live_head to rrd");
2060             return -1;
2061         }
2062     }
2063
2064
2065     if (rrd_write(rrd_file, rrd->pdp_prep,
2066                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2067         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2068         rrd_set_error("rrd_write pdp_prep to rrd");
2069         return -1;
2070     }
2071
2072     if (rrd_write(rrd_file, rrd->cdp_prep,
2073                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2074                   rrd->stat_head->ds_cnt)
2075         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2076                       rrd->stat_head->ds_cnt)) {
2077
2078         rrd_set_error("rrd_write cdp_prep to rrd");
2079         return -1;
2080     }
2081
2082     if (rrd_write(rrd_file, rrd->rra_ptr,
2083                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2084         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2085         rrd_set_error("rrd_write rra_ptr to rrd");
2086         return -1;
2087     }
2088     return 0;
2089 }
2090 #endif