fixed indenting
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.2.99907080300  Copyright by Tobi Oetiker, 1997-2007
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
9
10 #include "rrd_tool.h"
11
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
17
18 #include <locale.h>
19
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
22
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
25
26 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 /*
28  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
29  * replacement.
30  */
31 #include <sys/timeb.h>
32
33 #ifndef __MINGW32__
34 struct timeval {
35     time_t    tv_sec;   /* seconds */
36     long      tv_usec;  /* microseconds */
37 };
38 #endif
39
40 struct __timezone {
41     int       tz_minuteswest;   /* minutes W of Greenwich */
42     int       tz_dsttime;   /* type of dst correction */
43 };
44
45 static int gettimeofday(
46     struct timeval *t,
47     struct __timezone *tz)
48 {
49
50     struct _timeb current_time;
51
52     _ftime(&current_time);
53
54     t->tv_sec = current_time.time;
55     t->tv_usec = current_time.millitm * 1000;
56
57     return 0;
58 }
59
60 #endif
61
62 /* FUNCTION PROTOTYPES */
63
64 int       rrd_update_r(
65     const char *filename,
66     const char *tmplt,
67     int argc,
68     const char **argv);
69 int       _rrd_update(
70     const char *filename,
71     const char *tmplt,
72     int argc,
73     const char **argv,
74     info_t *);
75
76 static int allocate_data_structures(
77     rrd_t *rrd,
78     char ***updvals,
79     rrd_value_t **pdp_temp,
80     const char *tmplt,
81     long **tmpl_idx,
82     unsigned long *tmpl_cnt,
83     unsigned long **rra_step_cnt,
84     unsigned long **skip_update,
85     rrd_value_t **pdp_new);
86
87 static int parse_template(
88     rrd_t *rrd,
89     const char *tmplt,
90     unsigned long *tmpl_cnt,
91     long *tmpl_idx);
92
93 static int process_arg(
94     char *step_start,
95     rrd_t *rrd,
96     rrd_file_t *rrd_file,
97     unsigned long rra_begin,
98     unsigned long *rra_current,
99     time_t *current_time,
100     unsigned long *current_time_usec,
101     rrd_value_t *pdp_temp,
102     rrd_value_t *pdp_new,
103     unsigned long *rra_step_cnt,
104     char **updvals,
105     long *tmpl_idx,
106     unsigned long tmpl_cnt,
107     info_t **pcdp_summary,
108     int version,
109     unsigned long *skip_update,
110     int *schedule_smooth);
111
112 static int parse_ds(
113     rrd_t *rrd,
114     char **updvals,
115     long *tmpl_idx,
116     char *input,
117     unsigned long tmpl_cnt,
118     time_t *current_time,
119     unsigned long *current_time_usec,
120     int version);
121
122 static int get_time_from_reading(
123     rrd_t *rrd,
124     char timesyntax,
125     char **updvals,
126     time_t *current_time,
127     unsigned long *current_time_usec,
128     int version);
129
130 static int update_pdp_prep(
131     rrd_t *rrd,
132     char **updvals,
133     rrd_value_t *pdp_new,
134     double interval);
135
136 static int calculate_elapsed_steps(
137     rrd_t *rrd,
138     unsigned long current_time,
139     unsigned long current_time_usec,
140     double interval,
141     double *pre_int,
142     double *post_int,
143     unsigned long *proc_pdp_cnt);
144
145 static void simple_update(
146     rrd_t *rrd,
147     double interval,
148     rrd_value_t *pdp_new);
149
150 static int process_all_pdp_st(
151     rrd_t *rrd,
152     double interval,
153     double pre_int,
154     double post_int,
155     unsigned long elapsed_pdp_st,
156     rrd_value_t *pdp_new,
157     rrd_value_t *pdp_temp);
158
159 static int process_pdp_st(
160     rrd_t *rrd,
161     unsigned long ds_idx,
162     double interval,
163     double pre_int,
164     double post_int,
165     long diff_pdp_st,
166     rrd_value_t *pdp_new,
167     rrd_value_t *pdp_temp);
168
169 static int update_all_cdp_prep(
170     rrd_t *rrd,
171     unsigned long *rra_step_cnt,
172     unsigned long rra_begin,
173     rrd_file_t *rrd_file,
174     unsigned long elapsed_pdp_st,
175     unsigned long proc_pdp_cnt,
176     rrd_value_t **last_seasonal_coef,
177     rrd_value_t **seasonal_coef,
178     rrd_value_t *pdp_temp,
179     unsigned long *rra_current,
180     unsigned long *skip_update,
181     int *schedule_smooth);
182
183 static int do_schedule_smooth(
184     rrd_t *rrd,
185     unsigned long rra_idx,
186     unsigned long elapsed_pdp_st);
187
188 static int update_cdp_prep(
189     rrd_t *rrd,
190     unsigned long elapsed_pdp_st,
191     unsigned long start_pdp_offset,
192     unsigned long *rra_step_cnt,
193     int rra_idx,
194     rrd_value_t *pdp_temp,
195     rrd_value_t *last_seasonal_coef,
196     rrd_value_t *seasonal_coef,
197     int current_cf);
198
199 static void update_cdp(
200     unival *scratch,
201     int current_cf,
202     rrd_value_t pdp_temp_val,
203     unsigned long rra_step_cnt,
204     unsigned long elapsed_pdp_st,
205     unsigned long start_pdp_offset,
206     unsigned long pdp_cnt,
207     rrd_value_t xff,
208     int i,
209     int ii);
210
211 static void initialize_cdp_val(
212     unival *scratch,
213     int current_cf,
214     rrd_value_t pdp_temp_val,
215     unsigned long elapsed_pdp_st,
216     unsigned long start_pdp_offset,
217     unsigned long pdp_cnt);
218
219 static void reset_cdp(
220     rrd_t *rrd,
221     unsigned long elapsed_pdp_st,
222     rrd_value_t *pdp_temp,
223     rrd_value_t *last_seasonal_coef,
224     rrd_value_t *seasonal_coef,
225     int rra_idx,
226     int ds_idx,
227     int cdp_idx,
228     enum cf_en current_cf);
229
230 static rrd_value_t initialize_average_carry_over(
231     rrd_value_t pdp_temp_val,
232     unsigned long elapsed_pdp_st,
233     unsigned long start_pdp_offset,
234     unsigned long pdp_cnt);
235
236 static rrd_value_t calculate_cdp_val(
237     rrd_value_t cdp_val,
238     rrd_value_t pdp_temp_val,
239     unsigned long elapsed_pdp_st,
240     int current_cf,
241     int i,
242     int ii);
243
244 static int update_aberrant_cdps(
245     rrd_t *rrd,
246     rrd_file_t *rrd_file,
247     unsigned long rra_begin,
248     unsigned long *rra_current,
249     unsigned long elapsed_pdp_st,
250     rrd_value_t *pdp_temp,
251     rrd_value_t **seasonal_coef);
252
253 static int write_to_rras(
254     rrd_t *rrd,
255     rrd_file_t *rrd_file,
256     unsigned long *rra_step_cnt,
257     unsigned long rra_begin,
258     unsigned long *rra_current,
259     time_t current_time,
260     unsigned long *skip_update,
261     info_t **pcdp_summary);
262
263 static int write_RRA_row(
264     rrd_file_t *rrd_file,
265     rrd_t *rrd,
266     unsigned long rra_idx,
267     unsigned long *rra_current,
268     unsigned short CDP_scratch_idx,
269     info_t **pcdp_summary,
270     time_t rra_time);
271
272 static int smooth_all_rras(
273     rrd_t *rrd,
274     rrd_file_t *rrd_file,
275     unsigned long rra_begin);
276
277 #ifndef HAVE_MMAP
278 static int write_changes_to_disk(
279     rrd_t *rrd,
280     rrd_file_t *rrd_file,
281     int version);
282 #endif
283
284 /*
285  * normalize time as returned by gettimeofday. usec part must
286  * be always >= 0
287  */
288 static inline void normalize_time(
289     struct timeval *t)
290 {
291     if (t->tv_usec < 0) {
292         t->tv_sec--;
293         t->tv_usec += 1e6L;
294     }
295 }
296
297 /*
298  * Sets current_time and current_time_usec based on the current time.
299  * current_time_usec is set to 0 if the version number is 1 or 2.
300  */
301 static inline void initialize_time(
302     time_t *current_time,
303     unsigned long *current_time_usec,
304     int version)
305 {
306     struct timeval tmp_time;    /* used for time conversion */
307
308     gettimeofday(&tmp_time, 0);
309     normalize_time(&tmp_time);
310     *current_time = tmp_time.tv_sec;
311     if (version >= 3) {
312         *current_time_usec = tmp_time.tv_usec;
313     } else {
314         *current_time_usec = 0;
315     }
316 }
317
318 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
319
320 info_t   *rrd_update_v(
321     int argc,
322     char **argv)
323 {
324     char     *tmplt = NULL;
325     info_t   *result = NULL;
326     infoval   rc;
327     struct option long_options[] = {
328         {"template", required_argument, 0, 't'},
329         {0, 0, 0, 0}
330     };
331
332     rc.u_int = -1;
333     optind = 0;
334     opterr = 0;         /* initialize getopt */
335
336     while (1) {
337         int       option_index = 0;
338         int       opt;
339
340         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
341
342         if (opt == EOF)
343             break;
344
345         switch (opt) {
346         case 't':
347             tmplt = optarg;
348             break;
349
350         case '?':
351             rrd_set_error("unknown option '%s'", argv[optind - 1]);
352             goto end_tag;
353         }
354     }
355
356     /* need at least 2 arguments: filename, data. */
357     if (argc - optind < 2) {
358         rrd_set_error("Not enough arguments");
359         goto end_tag;
360     }
361     rc.u_int = 0;
362     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
363     rc.u_int = _rrd_update(argv[optind], tmplt,
364                            argc - optind - 1,
365                            (const char **) (argv + optind + 1), result);
366     result->value.u_int = rc.u_int;
367   end_tag:
368     return result;
369 }
370
371 int rrd_update(
372     int argc,
373     char **argv)
374 {
375     struct option long_options[] = {
376         {"template", required_argument, 0, 't'},
377         {0, 0, 0, 0}
378     };
379     int       option_index = 0;
380     int       opt;
381     char     *tmplt = NULL;
382     int       rc = -1;
383
384     optind = 0;
385     opterr = 0;         /* initialize getopt */
386
387     while (1) {
388         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
389
390         if (opt == EOF)
391             break;
392
393         switch (opt) {
394         case 't':
395             tmplt = strdup(optarg);
396             break;
397
398         case '?':
399             rrd_set_error("unknown option '%s'", argv[optind - 1]);
400             goto out;
401         }
402     }
403
404     /* need at least 2 arguments: filename, data. */
405     if (argc - optind < 2) {
406         rrd_set_error("Not enough arguments");
407         goto out;
408     }
409
410     rc = rrd_update_r(argv[optind], tmplt,
411                       argc - optind - 1, (const char **) (argv + optind + 1));
412   out:
413     free(tmplt);
414     return rc;
415 }
416
417 int rrd_update_r(
418     const char *filename,
419     const char *tmplt,
420     int argc,
421     const char **argv)
422 {
423     return _rrd_update(filename, tmplt, argc, argv, NULL);
424 }
425
426 int _rrd_update(
427     const char *filename,
428     const char *tmplt,
429     int argc,
430     const char **argv,
431     info_t *pcdp_summary)
432 {
433
434     int       arg_i = 2;
435
436     unsigned long rra_begin;    /* byte pointer to the rra
437                                  * area in the rrd file.  this
438                                  * pointer never changes value */
439     unsigned long rra_current;  /* byte pointer to the current write
440                                  * spot in the rrd file. */
441     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
442                              * to the existing entry */
443     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
444                              * to the cdp values */
445
446     long     *tmpl_idx; /* index representing the settings
447                          * transported by the tmplt index */
448     unsigned long tmpl_cnt = 2; /* time and data */
449     rrd_t     rrd;
450     time_t    current_time = 0;
451     unsigned long current_time_usec = 0;    /* microseconds part of current time */
452     char    **updvals;
453     int       schedule_smooth = 0;
454
455     /* number of elapsed PDP steps since last update */
456     unsigned long *rra_step_cnt = NULL;
457
458     int       version;  /* rrd version */
459     rrd_file_t *rrd_file;
460     char     *arg_copy; /* for processing the argv */
461     unsigned long *skip_update; /* RRAs to advance but not write */
462
463     /* need at least 1 arguments: data. */
464     if (argc < 1) {
465         rrd_set_error("Not enough arguments");
466         goto err_out;
467     }
468
469     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
470         goto err_free;
471     }
472     /* We are now at the beginning of the rra's */
473     rra_current = rra_begin = rrd_file->header_len;
474
475     version = atoi(rrd.stat_head->version);
476
477     initialize_time(&current_time, &current_time_usec, version);
478
479     /* get exclusive lock to whole file.
480      * lock gets removed when we close the file.
481      */
482     if (LockRRD(rrd_file->fd) != 0) {
483         rrd_set_error("could not lock RRD");
484         goto err_close;
485     }
486
487     if (allocate_data_structures(&rrd, &updvals,
488                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
489                                  &rra_step_cnt, &skip_update,
490                                  &pdp_new) == -1) {
491         goto err_close;
492     }
493
494     /* loop through the arguments. */
495     for (arg_i = 0; arg_i < argc; arg_i++) {
496         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
497             rrd_set_error("failed duplication argv entry");
498             break;
499         }
500         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
501                         &current_time, &current_time_usec, pdp_temp, pdp_new,
502                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
503                         &pcdp_summary, version, skip_update,
504                         &schedule_smooth) == -1) {
505             free(arg_copy);
506             break;
507         }
508         free(arg_copy);
509     }
510
511     free(rra_step_cnt);
512
513     /* if we got here and if there is an error and if the file has not been
514      * written to, then close things up and return. */
515     if (rrd_test_error()) {
516         goto err_free_structures;
517     }
518 #ifndef HAVE_MMAP
519     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
520         goto err_free_structures;
521     }
522 #endif
523
524     /* calling the smoothing code here guarantees at most one smoothing
525      * operation per rrd_update call. Unfortunately, it is possible with bulk
526      * updates, or a long-delayed update for smoothing to occur off-schedule.
527      * This really isn't critical except during the burn-in cycles. */
528     if (schedule_smooth) {
529         smooth_all_rras(&rrd, rrd_file, rra_begin);
530     }
531
532 /*    rrd_dontneed(rrd_file,&rrd); */
533     rrd_free(&rrd);
534     rrd_close(rrd_file);
535
536     free(pdp_new);
537     free(tmpl_idx);
538     free(pdp_temp);
539     free(skip_update);
540     free(updvals);
541     return 0;
542
543   err_free_structures:
544     free(pdp_new);
545     free(tmpl_idx);
546     free(pdp_temp);
547     free(skip_update);
548     free(updvals);
549   err_close:
550     rrd_close(rrd_file);
551   err_free:
552     rrd_free(&rrd);
553   err_out:
554     return -1;
555 }
556
557 /*
558  * get exclusive lock to whole file.
559  * lock gets removed when we close the file
560  *
561  * returns 0 on success
562  */
563 int LockRRD(
564     int in_file)
565 {
566     int       rcstat;
567
568     {
569 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
570         struct _stat st;
571
572         if (_fstat(in_file, &st) == 0) {
573             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
574         } else {
575             rcstat = -1;
576         }
577 #else
578         struct flock lock;
579
580         lock.l_type = F_WRLCK;  /* exclusive write lock */
581         lock.l_len = 0; /* whole file */
582         lock.l_start = 0;   /* start of file */
583         lock.l_whence = SEEK_SET;   /* end of file */
584
585         rcstat = fcntl(in_file, F_SETLK, &lock);
586 #endif
587     }
588
589     return (rcstat);
590 }
591
592 /*
593  * Allocate some important arrays used, and initialize the template.
594  *
595  * When it returns, either all of the structures are allocated
596  * or none of them are.
597  *
598  * Returns 0 on success, -1 on error.
599  */
600 static int allocate_data_structures(
601     rrd_t *rrd,
602     char ***updvals,
603     rrd_value_t **pdp_temp,
604     const char *tmplt,
605     long **tmpl_idx,
606     unsigned long *tmpl_cnt,
607     unsigned long **rra_step_cnt,
608     unsigned long **skip_update,
609     rrd_value_t **pdp_new)
610 {
611     unsigned  i, ii;
612     if ((*updvals = (char **) malloc(sizeof(char *)
613                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
614         rrd_set_error("allocating updvals pointer array.");
615         return -1;
616     }
617     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
618                                             * rrd->stat_head->ds_cnt)) ==
619         NULL) {
620         rrd_set_error("allocating pdp_temp.");
621         goto err_free_updvals;
622     }
623     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
624                                                  *
625                                                  rrd->stat_head->rra_cnt)) ==
626         NULL) {
627         rrd_set_error("allocating skip_update.");
628         goto err_free_pdp_temp;
629     }
630     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
631                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
632         rrd_set_error("allocating tmpl_idx.");
633         goto err_free_skip_update;
634     }
635     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
636                                                   *
637                                                   (rrd->stat_head->
638                                                    rra_cnt))) == NULL) {
639         rrd_set_error("allocating rra_step_cnt.");
640         goto err_free_tmpl_idx;
641     }
642
643     /* initialize tmplt redirector */
644     /* default config example (assume DS 1 is a CDEF DS)
645        tmpl_idx[0] -> 0; (time)
646        tmpl_idx[1] -> 1; (DS 0)
647        tmpl_idx[2] -> 3; (DS 2)
648        tmpl_idx[3] -> 4; (DS 3) */
649     (*tmpl_idx)[0] = 0; /* time */
650     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
651         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
652             (*tmpl_idx)[ii++] = i;
653     }
654     *tmpl_cnt = ii;
655
656     if (tmplt != NULL) {
657         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
658             goto err_free_rra_step_cnt;
659         }
660     }
661
662     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
663                                            * rrd->stat_head->ds_cnt)) == NULL) {
664         rrd_set_error("allocating pdp_new.");
665         goto err_free_rra_step_cnt;
666     }
667
668     return 0;
669
670   err_free_rra_step_cnt:
671     free(*rra_step_cnt);
672   err_free_tmpl_idx:
673     free(*tmpl_idx);
674   err_free_skip_update:
675     free(*skip_update);
676   err_free_pdp_temp:
677     free(*pdp_temp);
678   err_free_updvals:
679     free(*updvals);
680     return -1;
681 }
682
683 /*
684  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
685  *
686  * Returns 0 on success.
687  */
688 static int parse_template(
689     rrd_t *rrd,
690     const char *tmplt,
691     unsigned long *tmpl_cnt,
692     long *tmpl_idx)
693 {
694     char     *dsname, *tmplt_copy;
695     unsigned int tmpl_len, i;
696     int       ret = 0;
697
698     *tmpl_cnt = 1;      /* the first entry is the time */
699
700     /* we should work on a writeable copy here */
701     if ((tmplt_copy = strdup(tmplt)) == NULL) {
702         rrd_set_error("error copying tmplt '%s'", tmplt);
703         ret = -1;
704         goto out;
705     }
706
707     dsname = tmplt_copy;
708     tmpl_len = strlen(tmplt_copy);
709     for (i = 0; i <= tmpl_len; i++) {
710         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
711             tmplt_copy[i] = '\0';
712             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
713                 rrd_set_error("tmplt contains more DS definitions than RRD");
714                 ret = -1;
715                 goto out_free_tmpl_copy;
716             }
717             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
718                 rrd_set_error("unknown DS name '%s'", dsname);
719                 ret = -1;
720                 goto out_free_tmpl_copy;
721             }
722             /* go to the next entry on the tmplt_copy */
723             if (i < tmpl_len)
724                 dsname = &tmplt_copy[i + 1];
725         }
726     }
727   out_free_tmpl_copy:
728     free(tmplt_copy);
729   out:
730     return ret;
731 }
732
733 /*
734  * Parse an update string, updates the primary data points (PDPs)
735  * and consolidated data points (CDPs), and writes changes to the RRAs.
736  *
737  * Returns 0 on success, -1 on error.
738  */
739 static int process_arg(
740     char *step_start,
741     rrd_t *rrd,
742     rrd_file_t *rrd_file,
743     unsigned long rra_begin,
744     unsigned long *rra_current,
745     time_t *current_time,
746     unsigned long *current_time_usec,
747     rrd_value_t *pdp_temp,
748     rrd_value_t *pdp_new,
749     unsigned long *rra_step_cnt,
750     char **updvals,
751     long *tmpl_idx,
752     unsigned long tmpl_cnt,
753     info_t **pcdp_summary,
754     int version,
755     unsigned long *skip_update,
756     int *schedule_smooth)
757 {
758     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
759
760     /* a vector of future Holt-Winters seasonal coefs */
761     unsigned long elapsed_pdp_st;
762
763     double    interval, pre_int, post_int;  /* interval between this and
764                                              * the last run */
765     unsigned long proc_pdp_cnt;
766     unsigned long rra_start;
767
768     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
769                  current_time, current_time_usec, version) == -1) {
770         return -1;
771     }
772     /* seek to the beginning of the rra's */
773     if (*rra_current != rra_begin) {
774 #ifndef HAVE_MMAP
775         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
776             rrd_set_error("seek error in rrd");
777             return -1;
778         }
779 #endif
780         *rra_current = rra_begin;
781     }
782     rra_start = rra_begin;
783
784     interval = (double) (*current_time - rrd->live_head->last_up)
785         + (double) ((long) *current_time_usec -
786                     (long) rrd->live_head->last_up_usec) / 1e6f;
787
788     /* process the data sources and update the pdp_prep 
789      * area accordingly */
790     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
791         return -1;
792     }
793
794     elapsed_pdp_st = calculate_elapsed_steps(rrd,
795                                              *current_time,
796                                              *current_time_usec, interval,
797                                              &pre_int, &post_int,
798                                              &proc_pdp_cnt);
799
800     /* has a pdp_st moment occurred since the last run ? */
801     if (elapsed_pdp_st == 0) {
802         /* no we have not passed a pdp_st moment. therefore update is simple */
803         simple_update(rrd, interval, pdp_new);
804     } else {
805         /* an pdp_st has occurred. */
806         if (process_all_pdp_st(rrd, interval,
807                                pre_int, post_int,
808                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
809             return -1;
810         }
811         if (update_all_cdp_prep(rrd, rra_step_cnt,
812                                 rra_begin, rrd_file,
813                                 elapsed_pdp_st,
814                                 proc_pdp_cnt,
815                                 &last_seasonal_coef,
816                                 &seasonal_coef,
817                                 pdp_temp, rra_current,
818                                 skip_update, schedule_smooth) == -1) {
819             goto err_free_coefficients;
820         }
821         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
822                                  elapsed_pdp_st, pdp_temp,
823                                  &seasonal_coef) == -1) {
824             goto err_free_coefficients;
825         }
826         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
827                           rra_current, *current_time, skip_update,
828                           pcdp_summary) == -1) {
829             goto err_free_coefficients;
830         }
831     }                   /* endif a pdp_st has occurred */
832     rrd->live_head->last_up = *current_time;
833     rrd->live_head->last_up_usec = *current_time_usec;
834
835     free(seasonal_coef);
836     free(last_seasonal_coef);
837     return 0;
838
839   err_free_coefficients:
840     free(seasonal_coef);
841     free(last_seasonal_coef);
842     return -1;
843 }
844
845 /*
846  * Parse a DS string (time + colon-separated values), storing the
847  * results in current_time, current_time_usec, and updvals.
848  *
849  * Returns 0 on success, -1 on error.
850  */
851 static int parse_ds(
852     rrd_t *rrd,
853     char **updvals,
854     long *tmpl_idx,
855     char *input,
856     unsigned long tmpl_cnt,
857     time_t *current_time,
858     unsigned long *current_time_usec,
859     int version)
860 {
861     char     *p;
862     unsigned long i;
863     char      timesyntax;
864
865     updvals[0] = input;
866     /* initialize all ds input to unknown except the first one
867        which has always got to be set */
868     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
869         updvals[i] = "U";
870
871     /* separate all ds elements; first must be examined separately
872        due to alternate time syntax */
873     if ((p = strchr(input, '@')) != NULL) {
874         timesyntax = '@';
875     } else if ((p = strchr(input, ':')) != NULL) {
876         timesyntax = ':';
877     } else {
878         rrd_set_error("expected timestamp not found in data source from %s",
879                       input);
880         return -1;
881     }
882     *p = '\0';
883     i = 1;
884     updvals[tmpl_idx[i++]] = p + 1;
885     while (*(++p)) {
886         if (*p == ':') {
887             *p = '\0';
888             if (i < tmpl_cnt) {
889                 updvals[tmpl_idx[i++]] = p + 1;
890             }
891         }
892     }
893
894     if (i != tmpl_cnt) {
895         rrd_set_error("expected %lu data source readings (got %lu) from %s",
896                       tmpl_cnt - 1, i, input);
897         return -1;
898     }
899
900     if (get_time_from_reading(rrd, timesyntax, updvals,
901                               current_time, current_time_usec,
902                               version) == -1) {
903         return -1;
904     }
905     return 0;
906 }
907
908 /*
909  * Parse the time in a DS string, store it in current_time and 
910  * current_time_usec and verify that it's later than the last
911  * update for this DS.
912  *
913  * Returns 0 on success, -1 on error.
914  */
915 static int get_time_from_reading(
916     rrd_t *rrd,
917     char timesyntax,
918     char **updvals,
919     time_t *current_time,
920     unsigned long *current_time_usec,
921     int version)
922 {
923     double    tmp;
924     char     *parsetime_error = NULL;
925     char     *old_locale;
926     struct rrd_time_value ds_tv;
927     struct timeval tmp_time;    /* used for time conversion */
928
929     /* get the time from the reading ... handle N */
930     if (timesyntax == '@') {    /* at-style */
931         if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
932             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
933             return -1;
934         }
935         if (ds_tv.type == RELATIVE_TO_END_TIME ||
936             ds_tv.type == RELATIVE_TO_START_TIME) {
937             rrd_set_error("specifying time relative to the 'start' "
938                           "or 'end' makes no sense here: %s", updvals[0]);
939             return -1;
940         }
941         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
942         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
943     } else if (strcmp(updvals[0], "N") == 0) {
944         gettimeofday(&tmp_time, 0);
945         normalize_time(&tmp_time);
946         *current_time = tmp_time.tv_sec;
947         *current_time_usec = tmp_time.tv_usec;
948     } else {
949         old_locale = setlocale(LC_NUMERIC, "C");
950         tmp = strtod(updvals[0], 0);
951         setlocale(LC_NUMERIC, old_locale);
952         *current_time = floor(tmp);
953         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
954     }
955     /* dont do any correction for old version RRDs */
956     if (version < 3)
957         *current_time_usec = 0;
958
959     if (*current_time < rrd->live_head->last_up ||
960         (*current_time == rrd->live_head->last_up &&
961          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
962         rrd_set_error("illegal attempt to update using time %ld when "
963                       "last update time is %ld (minimum one second step)",
964                       *current_time, rrd->live_head->last_up);
965         return -1;
966     }
967     return 0;
968 }
969
970 /*
971  * Update pdp_new by interpreting the updvals according to the DS type
972  * (COUNTER, GAUGE, etc.).
973  *
974  * Returns 0 on success, -1 on error.
975  */
976 static int update_pdp_prep(
977     rrd_t *rrd,
978     char **updvals,
979     rrd_value_t *pdp_new,
980     double interval)
981 {
982     unsigned long ds_idx;
983     int       ii;
984     char     *endptr;   /* used in the conversion */
985     double    rate;
986     char     *old_locale;
987     enum dst_en dst_idx;
988
989     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
990         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
991
992         /* make sure we do not build diffs with old last_ds values */
993         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
994             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
995             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
996         }
997
998         /* NOTE: DST_CDEF should never enter this if block, because
999          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1000          * accidently specified a value for the DST_CDEF. To handle this case,
1001          * an extra check is required. */
1002
1003         if ((updvals[ds_idx + 1][0] != 'U') &&
1004             (dst_idx != DST_CDEF) &&
1005             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1006             rate = DNAN;
1007
1008             /* pdp_new contains rate * time ... eg the bytes transferred during
1009              * the interval. Doing it this way saves a lot of math operations
1010              */
1011             switch (dst_idx) {
1012             case DST_COUNTER:
1013             case DST_DERIVE:
1014                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1015                     if ((updvals[ds_idx + 1][ii] < '0'
1016                          || updvals[ds_idx + 1][ii] > '9')
1017                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1018                         rrd_set_error("not a simple integer: '%s'",
1019                                       updvals[ds_idx + 1]);
1020                         return -1;
1021                     }
1022                 }
1023                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1024                     pdp_new[ds_idx] =
1025                         rrd_diff(updvals[ds_idx + 1],
1026                                  rrd->pdp_prep[ds_idx].last_ds);
1027                     if (dst_idx == DST_COUNTER) {
1028                         /* simple overflow catcher. This will fail
1029                          * terribly for non 32 or 64 bit counters
1030                          * ... are there any others in SNMP land?
1031                          */
1032                         if (pdp_new[ds_idx] < (double) 0.0)
1033                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1034                         if (pdp_new[ds_idx] < (double) 0.0)
1035                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1036                     }
1037                     rate = pdp_new[ds_idx] / interval;
1038                 } else {
1039                     pdp_new[ds_idx] = DNAN;
1040                 }
1041                 break;
1042             case DST_ABSOLUTE:
1043                 old_locale = setlocale(LC_NUMERIC, "C");
1044                 errno = 0;
1045                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1046                 setlocale(LC_NUMERIC, old_locale);
1047                 if (errno > 0) {
1048                     rrd_set_error("converting '%s' to float: %s",
1049                                   updvals[ds_idx + 1], rrd_strerror(errno));
1050                     return -1;
1051                 };
1052                 if (endptr[0] != '\0') {
1053                     rrd_set_error
1054                         ("conversion of '%s' to float not complete: tail '%s'",
1055                          updvals[ds_idx + 1], endptr);
1056                     return -1;
1057                 }
1058                 rate = pdp_new[ds_idx] / interval;
1059                 break;
1060             case DST_GAUGE:
1061                 errno = 0;
1062                 old_locale = setlocale(LC_NUMERIC, "C");
1063                 pdp_new[ds_idx] =
1064                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1065                 setlocale(LC_NUMERIC, old_locale);
1066                 if (errno) {
1067                     rrd_set_error("converting '%s' to float: %s",
1068                                   updvals[ds_idx + 1], rrd_strerror(errno));
1069                     return -1;
1070                 };
1071                 if (endptr[0] != '\0') {
1072                     rrd_set_error
1073                         ("conversion of '%s' to float not complete: tail '%s'",
1074                          updvals[ds_idx + 1], endptr);
1075                     return -1;
1076                 }
1077                 rate = pdp_new[ds_idx] / interval;
1078                 break;
1079             default:
1080                 rrd_set_error("rrd contains unknown DS type : '%s'",
1081                               rrd->ds_def[ds_idx].dst);
1082                 return -1;
1083             }
1084             /* break out of this for loop if the error string is set */
1085             if (rrd_test_error()) {
1086                 return -1;
1087             }
1088             /* make sure pdp_temp is neither too large or too small
1089              * if any of these occur it becomes unknown ...
1090              * sorry folks ... */
1091             if (!isnan(rate) &&
1092                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1093                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1094                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1095                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1096                 pdp_new[ds_idx] = DNAN;
1097             }
1098         } else {
1099             /* no news is news all the same */
1100             pdp_new[ds_idx] = DNAN;
1101         }
1102
1103
1104         /* make a copy of the command line argument for the next run */
1105 #ifdef DEBUG
1106         fprintf(stderr, "prep ds[%lu]\t"
1107                 "last_arg '%s'\t"
1108                 "this_arg '%s'\t"
1109                 "pdp_new %10.2f\n",
1110                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1111                 pdp_new[ds_idx]);
1112 #endif
1113         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1114                 LAST_DS_LEN - 1);
1115         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1116     }
1117     return 0;
1118 }
1119
1120 /*
1121  * How many PDP steps have elapsed since the last update? Returns the answer,
1122  * and stores the time between the last update and the last PDP in pre_time,
1123  * and the time between the last PDP and the current time in post_int.
1124  */
1125 static int calculate_elapsed_steps(
1126     rrd_t *rrd,
1127     unsigned long current_time,
1128     unsigned long current_time_usec,
1129     double interval,
1130     double *pre_int,
1131     double *post_int,
1132     unsigned long *proc_pdp_cnt)
1133 {
1134     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1135     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1136                                  * time */
1137     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1138                                  * when it was last updated */
1139     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1140
1141     /* when was the current pdp started */
1142     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1143     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1144
1145     /* when did the last pdp_st occur */
1146     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1147     occu_pdp_st = current_time - occu_pdp_age;
1148
1149     if (occu_pdp_st > proc_pdp_st) {
1150         /* OK we passed the pdp_st moment */
1151         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1152                                                                      * occurred before the latest
1153                                                                      * pdp_st moment*/
1154         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1155         *post_int = occu_pdp_age;   /* how much after it */
1156         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1157     } else {
1158         *pre_int = interval;
1159         *post_int = 0;
1160     }
1161
1162     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1163
1164 #ifdef DEBUG
1165     printf("proc_pdp_age %lu\t"
1166            "proc_pdp_st %lu\t"
1167            "occu_pfp_age %lu\t"
1168            "occu_pdp_st %lu\t"
1169            "int %lf\t"
1170            "pre_int %lf\t"
1171            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1172            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1173 #endif
1174
1175     /* compute the number of elapsed pdp_st moments */
1176     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1177 }
1178
1179 /*
1180  * Increment the PDP values by the values in pdp_new, or else initialize them.
1181  */
1182 static void simple_update(
1183     rrd_t *rrd,
1184     double interval,
1185     rrd_value_t *pdp_new)
1186 {
1187     int       i;
1188
1189     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1190         if (isnan(pdp_new[i])) {
1191             /* this is not really accurate if we use subsecond data arrival time
1192                should have thought of it when going subsecond resolution ...
1193                sorry next format change we will have it! */
1194             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1195                 floor(interval);
1196         } else {
1197             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1198                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1199             } else {
1200                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1201             }
1202         }
1203 #ifdef DEBUG
1204         fprintf(stderr,
1205                 "NO PDP  ds[%i]\t"
1206                 "value %10.2f\t"
1207                 "unkn_sec %5lu\n",
1208                 i,
1209                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1210                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1211 #endif
1212     }
1213 }
1214
1215 /*
1216  * Call process_pdp_st for each DS.
1217  *
1218  * Returns 0 on success, -1 on error.
1219  */
1220 static int process_all_pdp_st(
1221     rrd_t *rrd,
1222     double interval,
1223     double pre_int,
1224     double post_int,
1225     unsigned long elapsed_pdp_st,
1226     rrd_value_t *pdp_new,
1227     rrd_value_t *pdp_temp)
1228 {
1229     unsigned long ds_idx;
1230
1231     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1232        rate*seconds which occurred up to the last run.
1233        pdp_new[] contains rate*seconds from the latest run.
1234        pdp_temp[] will contain the rate for cdp */
1235
1236     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1237         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1238                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1239                            pdp_new, pdp_temp) == -1) {
1240             return -1;
1241         }
1242 #ifdef DEBUG
1243         fprintf(stderr, "PDP UPD ds[%lu]\t"
1244                 "elapsed_pdp_st %lu\t"
1245                 "pdp_temp %10.2f\t"
1246                 "new_prep %10.2f\t"
1247                 "new_unkn_sec %5lu\n",
1248                 ds_idx,
1249                 elapsed_pdp_st,
1250                 pdp_temp[ds_idx],
1251                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1252                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1253 #endif
1254     }
1255     return 0;
1256 }
1257
1258 /*
1259  * Process an update that occurs after one of the PDP moments.
1260  * Increments the PDP value, sets NAN if time greater than the
1261  * heartbeats have elapsed, processes CDEFs.
1262  *
1263  * Returns 0 on success, -1 on error.
1264  */
1265 static int process_pdp_st(
1266     rrd_t *rrd,
1267     unsigned long ds_idx,
1268     double interval,
1269     double pre_int,
1270     double post_int,
1271     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1272     rrd_value_t *pdp_new,
1273     rrd_value_t *pdp_temp)
1274 {
1275     int       i;
1276
1277     /* update pdp_prep to the current pdp_st. */
1278     double    pre_unknown = 0.0;
1279     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1280     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1281
1282     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1283
1284     rpnstack_init(&rpnstack);
1285
1286
1287     if (isnan(pdp_new[ds_idx])) {
1288         /* a final bit of unknown to be added before calculation
1289            we use a temporary variable for this so that we
1290            don't have to turn integer lines before using the value */
1291         pre_unknown = pre_int;
1292     } else {
1293         if (isnan(scratch[PDP_val].u_val)) {
1294             scratch[PDP_val].u_val = 0;
1295         }
1296         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1297     }
1298
1299     /* if too much of the pdp_prep is unknown we dump it */
1300     /* if the interval is larger thatn mrhb we get NAN */
1301     if ((interval > mrhb) ||
1302         (rrd->stat_head->pdp_step / 2.0 <
1303          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1304         pdp_temp[ds_idx] = DNAN;
1305     } else {
1306         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1307             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1308              pre_unknown);
1309     }
1310
1311     /* process CDEF data sources; remember each CDEF DS can
1312      * only reference other DS with a lower index number */
1313     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1314         rpnp_t   *rpnp;
1315
1316         rpnp =
1317             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1318         /* substitute data values for OP_VARIABLE nodes */
1319         for (i = 0; rpnp[i].op != OP_END; i++) {
1320             if (rpnp[i].op == OP_VARIABLE) {
1321                 rpnp[i].op = OP_NUMBER;
1322                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1323             }
1324         }
1325         /* run the rpn calculator */
1326         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1327             free(rpnp);
1328             rpnstack_free(&rpnstack);
1329             return -1;
1330         }
1331     }
1332
1333     /* make pdp_prep ready for the next run */
1334     if (isnan(pdp_new[ds_idx])) {
1335         /* this is not realy accurate if we use subsecond data arival time
1336            should have thought of it when going subsecond resolution ...
1337            sorry next format change we will have it! */
1338         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1339         scratch[PDP_val].u_val = DNAN;
1340     } else {
1341         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1342         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1343     }
1344     rpnstack_free(&rpnstack);
1345     return 0;
1346 }
1347
1348 /*
1349  * Iterate over all the RRAs for a given DS and:
1350  * 1. Decide whether to schedule a smooth later
1351  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1352  * 3. Update the CDP
1353  *
1354  * Returns 0 on success, -1 on error
1355  */
1356 static int update_all_cdp_prep(
1357     rrd_t *rrd,
1358     unsigned long *rra_step_cnt,
1359     unsigned long rra_begin,
1360     rrd_file_t *rrd_file,
1361     unsigned long elapsed_pdp_st,
1362     unsigned long proc_pdp_cnt,
1363     rrd_value_t **last_seasonal_coef,
1364     rrd_value_t **seasonal_coef,
1365     rrd_value_t *pdp_temp,
1366     unsigned long *rra_current,
1367     unsigned long *skip_update,
1368     int *schedule_smooth)
1369 {
1370     unsigned long rra_idx;
1371
1372     /* index into the CDP scratch array */
1373     enum cf_en current_cf;
1374     unsigned long rra_start;
1375
1376     /* number of rows to be updated in an RRA for a data value. */
1377     unsigned long start_pdp_offset;
1378
1379     rra_start = rra_begin;
1380     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1381         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1382         start_pdp_offset =
1383             rrd->rra_def[rra_idx].pdp_cnt -
1384             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1385         skip_update[rra_idx] = 0;
1386         if (start_pdp_offset <= elapsed_pdp_st) {
1387             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1388                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1389         } else {
1390             rra_step_cnt[rra_idx] = 0;
1391         }
1392
1393         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1394             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1395              * so that they will be correct for the next observed value; note that for
1396              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1397              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1398             if (rra_step_cnt[rra_idx] > 1) {
1399                 skip_update[rra_idx] = 1;
1400                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1401                                 elapsed_pdp_st, last_seasonal_coef);
1402                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1403                                 elapsed_pdp_st + 1, seasonal_coef);
1404             }
1405             /* periodically run a smoother for seasonal effects */
1406             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1407 #ifdef DEBUG
1408                 fprintf(stderr,
1409                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1410                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1411                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1412                         u_cnt);
1413 #endif
1414                 *schedule_smooth = 1;
1415             }
1416             *rra_current = rrd_tell(rrd_file);
1417         }
1418         if (rrd_test_error())
1419             return -1;
1420
1421         if (update_cdp_prep
1422             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1423              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1424              current_cf) == -1) {
1425             return -1;
1426         }
1427         rra_start +=
1428             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1429             sizeof(rrd_value_t);
1430     }
1431     return 0;
1432 }
1433
1434 /* 
1435  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1436  */
1437 static int do_schedule_smooth(
1438     rrd_t *rrd,
1439     unsigned long rra_idx,
1440     unsigned long elapsed_pdp_st)
1441 {
1442     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1443     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1444     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1445     unsigned long seasonal_smooth_idx =
1446         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1447     unsigned long *init_seasonal =
1448         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1449
1450     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1451      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1452      * really an RRA level, not a data source within RRA level parameter, but
1453      * the rra_def is read only for rrd_update (not flushed to disk). */
1454     if (*init_seasonal > BURNIN_CYCLES) {
1455         /* someone has no doubt invented a trick to deal with this wrap around,
1456          * but at least this code is clear. */
1457         if (seasonal_smooth_idx > cur_row) {
1458             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1459              * between PDP and CDP */
1460             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1461         }
1462         /* can't rely on negative numbers because we are working with
1463          * unsigned values */
1464         return (cur_row + elapsed_pdp_st >= row_cnt
1465                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1466     }
1467     /* mark off one of the burn-in cycles */
1468     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1469 }
1470
1471 /*
1472  * For a given RRA, iterate over the data sources and call the appropriate
1473  * consolidation function.
1474  *
1475  * Returns 0 on success, -1 on error.
1476  */
1477 static int update_cdp_prep(
1478     rrd_t *rrd,
1479     unsigned long elapsed_pdp_st,
1480     unsigned long start_pdp_offset,
1481     unsigned long *rra_step_cnt,
1482     int rra_idx,
1483     rrd_value_t *pdp_temp,
1484     rrd_value_t *last_seasonal_coef,
1485     rrd_value_t *seasonal_coef,
1486     int current_cf)
1487 {
1488     unsigned long ds_idx, cdp_idx;
1489
1490     /* update CDP_PREP areas */
1491     /* loop over data soures within each RRA */
1492     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1493
1494         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1495
1496         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1497             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1498                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1499                        elapsed_pdp_st, start_pdp_offset,
1500                        rrd->rra_def[rra_idx].pdp_cnt,
1501                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1502                        rra_idx, ds_idx);
1503         } else {
1504             /* Nothing to consolidate if there's one PDP per CDP. However, if
1505              * we've missed some PDPs, let's update null counters etc. */
1506             if (elapsed_pdp_st > 2) {
1507                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1508                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1509                           current_cf);
1510             }
1511         }
1512
1513         if (rrd_test_error())
1514             return -1;
1515     }                   /* endif data sources loop */
1516     return 0;
1517 }
1518
1519 /*
1520  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1521  * primary value, secondary value, and # of unknowns.
1522  */
1523 static void update_cdp(
1524     unival *scratch,
1525     int current_cf,
1526     rrd_value_t pdp_temp_val,
1527     unsigned long rra_step_cnt,
1528     unsigned long elapsed_pdp_st,
1529     unsigned long start_pdp_offset,
1530     unsigned long pdp_cnt,
1531     rrd_value_t xff,
1532     int i,
1533     int ii)
1534 {
1535     /* shorthand variables */
1536     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1537     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1538     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1539     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1540
1541     if (rra_step_cnt) {
1542         /* If we are in this block, as least 1 CDP value will be written to
1543          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1544          * to be written, then the "fill in" value is the CDP_secondary_val
1545          * entry. */
1546         if (isnan(pdp_temp_val)) {
1547             *cdp_unkn_pdp_cnt += start_pdp_offset;
1548             *cdp_secondary_val = DNAN;
1549         } else {
1550             /* CDP_secondary value is the RRA "fill in" value for intermediary
1551              * CDP data entries. No matter the CF, the value is the same because
1552              * the average, max, min, and last of a list of identical values is
1553              * the same, namely, the value itself. */
1554             *cdp_secondary_val = pdp_temp_val;
1555         }
1556
1557         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1558             *cdp_primary_val = DNAN;
1559             if (current_cf == CF_AVERAGE) {
1560                 *cdp_val =
1561                     initialize_average_carry_over(pdp_temp_val,
1562                                                   elapsed_pdp_st,
1563                                                   start_pdp_offset, pdp_cnt);
1564             } else {
1565                 *cdp_val = pdp_temp_val;
1566             }
1567         } else {
1568             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1569                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1570         }               /* endif meets xff value requirement for a valid value */
1571         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1572          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1573         if (isnan(pdp_temp_val))
1574             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1575         else
1576             *cdp_unkn_pdp_cnt = 0;
1577     } else {            /* rra_step_cnt[i]  == 0 */
1578
1579 #ifdef DEBUG
1580         if (isnan(*cdp_val)) {
1581             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1582                     i, ii);
1583         } else {
1584             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1585                     i, ii, *cdp_val);
1586         }
1587 #endif
1588         if (isnan(pdp_temp_val)) {
1589             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1590         } else {
1591             *cdp_val =
1592                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1593                                   current_cf, i, ii);
1594         }
1595     }
1596 }
1597
1598 /*
1599  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1600  * on the type of consolidation function.
1601  */
1602 static void initialize_cdp_val(
1603     unival *scratch,
1604     int current_cf,
1605     rrd_value_t pdp_temp_val,
1606     unsigned long elapsed_pdp_st,
1607     unsigned long start_pdp_offset,
1608     unsigned long pdp_cnt)
1609 {
1610     rrd_value_t cum_val, cur_val;
1611
1612     switch (current_cf) {
1613     case CF_AVERAGE:
1614         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1615         cur_val = IFDNAN(pdp_temp_val, 0.0);
1616         scratch[CDP_primary_val].u_val =
1617             (cum_val + cur_val * start_pdp_offset) /
1618             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1619         scratch[CDP_val].u_val =
1620             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1621                                           start_pdp_offset, pdp_cnt);
1622         break;
1623     case CF_MAXIMUM:
1624         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1625         cur_val = IFDNAN(pdp_temp_val, -DINF);
1626 #if 0
1627 #ifdef DEBUG
1628         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1629             fprintf(stderr,
1630                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1631                     i, ii);
1632             exit(-1);
1633         }
1634 #endif
1635 #endif
1636         if (cur_val > cum_val)
1637             scratch[CDP_primary_val].u_val = cur_val;
1638         else
1639             scratch[CDP_primary_val].u_val = cum_val;
1640         /* initialize carry over value */
1641         scratch[CDP_val].u_val = pdp_temp_val;
1642         break;
1643     case CF_MINIMUM:
1644         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1645         cur_val = IFDNAN(pdp_temp_val, DINF);
1646 #if 0
1647 #ifdef DEBUG
1648         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1649             fprintf(stderr,
1650                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1651                     ii);
1652             exit(-1);
1653         }
1654 #endif
1655 #endif
1656         if (cur_val < cum_val)
1657             scratch[CDP_primary_val].u_val = cur_val;
1658         else
1659             scratch[CDP_primary_val].u_val = cum_val;
1660         /* initialize carry over value */
1661         scratch[CDP_val].u_val = pdp_temp_val;
1662         break;
1663     case CF_LAST:
1664     default:
1665         scratch[CDP_primary_val].u_val = pdp_temp_val;
1666         /* initialize carry over value */
1667         scratch[CDP_val].u_val = pdp_temp_val;
1668         break;
1669     }
1670 }
1671
1672 /*
1673  * Update the consolidation function for Holt-Winters functions as
1674  * well as other functions that don't actually consolidate multiple
1675  * PDPs.
1676  */
1677 static void reset_cdp(
1678     rrd_t *rrd,
1679     unsigned long elapsed_pdp_st,
1680     rrd_value_t *pdp_temp,
1681     rrd_value_t *last_seasonal_coef,
1682     rrd_value_t *seasonal_coef,
1683     int rra_idx,
1684     int ds_idx,
1685     int cdp_idx,
1686     enum cf_en current_cf)
1687 {
1688     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1689
1690     switch (current_cf) {
1691     case CF_AVERAGE:
1692     default:
1693         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1694         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1695         break;
1696     case CF_SEASONAL:
1697     case CF_DEVSEASONAL:
1698         /* need to update cached seasonal values, so they are consistent
1699          * with the bulk update */
1700         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1701          * CDP_last_deviation are the same. */
1702         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1703         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1704         break;
1705     case CF_HWPREDICT:
1706     case CF_MHWPREDICT:
1707         /* need to update the null_count and last_null_count.
1708          * even do this for non-DNAN pdp_temp because the
1709          * algorithm is not learning from batch updates. */
1710         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1711         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1712         /* fall through */
1713     case CF_DEVPREDICT:
1714         scratch[CDP_primary_val].u_val = DNAN;
1715         scratch[CDP_secondary_val].u_val = DNAN;
1716         break;
1717     case CF_FAILURES:
1718         /* do not count missed bulk values as failures */
1719         scratch[CDP_primary_val].u_val = 0;
1720         scratch[CDP_secondary_val].u_val = 0;
1721         /* need to reset violations buffer.
1722          * could do this more carefully, but for now, just
1723          * assume a bulk update wipes away all violations. */
1724         erase_violations(rrd, cdp_idx, rra_idx);
1725         break;
1726     }
1727 }
1728
1729 static rrd_value_t initialize_average_carry_over(
1730     rrd_value_t pdp_temp_val,
1731     unsigned long elapsed_pdp_st,
1732     unsigned long start_pdp_offset,
1733     unsigned long pdp_cnt)
1734 {
1735     /* initialize carry over value */
1736     if (isnan(pdp_temp_val)) {
1737         return DNAN;
1738     }
1739     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1740 }
1741
1742 /*
1743  * Update or initialize a CDP value based on the consolidation
1744  * function.
1745  *
1746  * Returns the new value.
1747  */
1748 static rrd_value_t calculate_cdp_val(
1749     rrd_value_t cdp_val,
1750     rrd_value_t pdp_temp_val,
1751     unsigned long elapsed_pdp_st,
1752     int current_cf,
1753 #ifdef DEBUG
1754     int i,
1755     int ii
1756 #else
1757     int UNUSED(i),
1758     int UNUSED(ii)
1759 #endif
1760     )
1761 {
1762     if (isnan(cdp_val)) {
1763         if (current_cf == CF_AVERAGE) {
1764             pdp_temp_val *= elapsed_pdp_st;
1765         }
1766 #ifdef DEBUG
1767         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1768                 i, ii, pdp_temp_val);
1769 #endif
1770         return pdp_temp_val;
1771     }
1772     if (current_cf == CF_AVERAGE)
1773         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1774     if (current_cf == CF_MINIMUM)
1775         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1776     if (current_cf == CF_MAXIMUM)
1777         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1778
1779     return pdp_temp_val;
1780 }
1781
1782 /*
1783  * For each RRA, update the seasonal values and then call update_aberrant_CF
1784  * for each data source.
1785  *
1786  * Return 0 on success, -1 on error.
1787  */
1788 static int update_aberrant_cdps(
1789     rrd_t *rrd,
1790     rrd_file_t *rrd_file,
1791     unsigned long rra_begin,
1792     unsigned long *rra_current,
1793     unsigned long elapsed_pdp_st,
1794     rrd_value_t *pdp_temp,
1795     rrd_value_t **seasonal_coef)
1796 {
1797     unsigned long rra_idx, ds_idx, j;
1798
1799     /* number of PDP steps since the last update that
1800      * are assigned to the first CDP to be generated
1801      * since the last update. */
1802     unsigned short scratch_idx;
1803     unsigned long rra_start;
1804     enum cf_en current_cf;
1805
1806     /* this loop is only entered if elapsed_pdp_st < 3 */
1807     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1808          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1809         rra_start = rra_begin;
1810         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1811             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1812                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1813                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1814                     if (scratch_idx == CDP_primary_val) {
1815                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1816                                         elapsed_pdp_st + 1, seasonal_coef);
1817                     } else {
1818                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1819                                         elapsed_pdp_st + 2, seasonal_coef);
1820                     }
1821                     *rra_current = rrd_tell(rrd_file);
1822                 }
1823                 if (rrd_test_error())
1824                     return -1;
1825                 /* loop over data soures within each RRA */
1826                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1827                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1828                                        rra_idx * (rrd->stat_head->ds_cnt) +
1829                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1830                                        *seasonal_coef);
1831                 }
1832             }
1833             rra_start += rrd->rra_def[rra_idx].row_cnt
1834                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1835         }
1836     }
1837     return 0;
1838 }
1839
1840 /* 
1841  * Move sequentially through the file, writing one RRA at a time.  Note this
1842  * architecture divorces the computation of CDP with flushing updated RRA
1843  * entries to disk.
1844  *
1845  * Return 0 on success, -1 on error.
1846  */
1847 static int write_to_rras(
1848     rrd_t *rrd,
1849     rrd_file_t *rrd_file,
1850     unsigned long *rra_step_cnt,
1851     unsigned long rra_begin,
1852     unsigned long *rra_current,
1853     time_t current_time,
1854     unsigned long *skip_update,
1855     info_t **pcdp_summary)
1856 {
1857     unsigned long rra_idx;
1858     unsigned long rra_start;
1859     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1860     time_t    rra_time = 0; /* time of update for a RRA */
1861
1862     /* Ready to write to disk */
1863     rra_start = rra_begin;
1864     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1865         /* skip unless there's something to write */
1866         if (rra_step_cnt[rra_idx]) {
1867             /* write the first row */
1868 #ifdef DEBUG
1869             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1870 #endif
1871             rrd->rra_ptr[rra_idx].cur_row++;
1872             if (rrd->rra_ptr[rra_idx].cur_row >=
1873                 rrd->rra_def[rra_idx].row_cnt)
1874                 rrd->rra_ptr[rra_idx].cur_row = 0;  /* wrap around */
1875             /* position on the first row */
1876             rra_pos_tmp = rra_start +
1877                 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1878                 sizeof(rrd_value_t);
1879             if (rra_pos_tmp != *rra_current) {
1880                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1881                     rrd_set_error("seek error in rrd");
1882                     return -1;
1883                 }
1884                 *rra_current = rra_pos_tmp;
1885             }
1886 #ifdef DEBUG
1887             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1888 #endif
1889             if (!skip_update[rra_idx]) {
1890                 if (*pcdp_summary != NULL) {
1891                     rra_time = (current_time - current_time
1892                                 % (rrd->rra_def[rra_idx].pdp_cnt *
1893                                    rrd->stat_head->pdp_step))
1894                         -
1895                         ((rra_step_cnt[rra_idx] -
1896                           1) * rrd->rra_def[rra_idx].pdp_cnt *
1897                          rrd->stat_head->pdp_step);
1898                 }
1899                 if (write_RRA_row
1900                     (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1901                      pcdp_summary, rra_time) == -1)
1902                     return -1;
1903             }
1904
1905             /* write other rows of the bulk update, if any */
1906             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1907                 if (++rrd->rra_ptr[rra_idx].cur_row ==
1908                     rrd->rra_def[rra_idx].row_cnt) {
1909 #ifdef DEBUG
1910                     fprintf(stderr,
1911                             "Wraparound for RRA %s, %lu updates left\n",
1912                             rrd->rra_def[rra_idx].cf_nam,
1913                             rra_step_cnt[rra_idx] - 1);
1914 #endif
1915                     /* wrap */
1916                     rrd->rra_ptr[rra_idx].cur_row = 0;
1917                     /* seek back to beginning of current rra */
1918                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1919                         rrd_set_error("seek error in rrd");
1920                         return -1;
1921                     }
1922 #ifdef DEBUG
1923                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1924                             rrd_file->pos);
1925 #endif
1926                     *rra_current = rra_start;
1927                 }
1928                 if (!skip_update[rra_idx]) {
1929                     if (*pcdp_summary != NULL) {
1930                         rra_time = (current_time - current_time
1931                                     % (rrd->rra_def[rra_idx].pdp_cnt *
1932                                        rrd->stat_head->pdp_step))
1933                             -
1934                             ((rra_step_cnt[rra_idx] -
1935                               2) * rrd->rra_def[rra_idx].pdp_cnt *
1936                              rrd->stat_head->pdp_step);
1937                     }
1938                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1939                                       CDP_secondary_val, pcdp_summary,
1940                                       rra_time) == -1)
1941                         return -1;
1942                 }
1943             }
1944         }
1945         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1946             sizeof(rrd_value_t);
1947     }                   /* RRA LOOP */
1948
1949     return 0;
1950 }
1951
1952 /*
1953  * Write out one row of values (one value per DS) to the archive.
1954  *
1955  * Returns 0 on success, -1 on error.
1956  */
1957 static int write_RRA_row(
1958     rrd_file_t *rrd_file,
1959     rrd_t *rrd,
1960     unsigned long rra_idx,
1961     unsigned long *rra_current,
1962     unsigned short CDP_scratch_idx,
1963     info_t **pcdp_summary,
1964     time_t rra_time)
1965 {
1966     unsigned long ds_idx, cdp_idx;
1967     infoval   iv;
1968
1969     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1970         /* compute the cdp index */
1971         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1972 #ifdef DEBUG
1973         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1974                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1975                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1976 #endif
1977         if (*pcdp_summary != NULL) {
1978             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1979             /* append info to the return hash */
1980             *pcdp_summary = info_push(*pcdp_summary,
1981                                       sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1982                                                     rra_time,
1983                                                     rrd->rra_def[rra_idx].
1984                                                     cf_nam,
1985                                                     rrd->rra_def[rra_idx].
1986                                                     pdp_cnt,
1987                                                     rrd->ds_def[ds_idx].
1988                                                     ds_nam), RD_I_VAL, iv);
1989         }
1990         if (rrd_write(rrd_file,
1991                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1992                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1993             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1994             return -1;
1995         }
1996         *rra_current += sizeof(rrd_value_t);
1997     }
1998     return 0;
1999 }
2000
2001 /*
2002  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2003  *
2004  * Returns 0 on success, -1 otherwise
2005  */
2006 static int smooth_all_rras(
2007     rrd_t *rrd,
2008     rrd_file_t *rrd_file,
2009     unsigned long rra_begin)
2010 {
2011     unsigned long rra_start = rra_begin;
2012     unsigned long rra_idx;
2013
2014     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2015         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2016             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2017 #ifdef DEBUG
2018             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2019 #endif
2020             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2021             if (rrd_test_error())
2022                 return -1;
2023         }
2024         rra_start += rrd->rra_def[rra_idx].row_cnt
2025             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2026     }
2027     return 0;
2028 }
2029
2030 #ifndef HAVE_MMAP
2031 /*
2032  * Flush changes to disk (unless we're using mmap)
2033  *
2034  * Returns 0 on success, -1 otherwise
2035  */
2036 static int write_changes_to_disk(
2037     rrd_t *rrd,
2038     rrd_file_t *rrd_file,
2039     int version)
2040 {
2041     /* we just need to write back the live header portion now */
2042     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2043                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2044                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2045                  SEEK_SET) != 0) {
2046         rrd_set_error("seek rrd for live header writeback");
2047         return -1;
2048     }
2049     if (version >= 3) {
2050         if (rrd_write(rrd_file, rrd->live_head,
2051                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2052             rrd_set_error("rrd_write live_head to rrd");
2053             return -1;
2054         }
2055     } else {
2056         if (rrd_write(rrd_file, &rrd->live_head->last_up,
2057                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2058             rrd_set_error("rrd_write live_head to rrd");
2059             return -1;
2060         }
2061     }
2062
2063
2064     if (rrd_write(rrd_file, rrd->pdp_prep,
2065                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2066         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2067         rrd_set_error("rrd_write pdp_prep to rrd");
2068         return -1;
2069     }
2070
2071     if (rrd_write(rrd_file, rrd->cdp_prep,
2072                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2073                   rrd->stat_head->ds_cnt)
2074         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2075                       rrd->stat_head->ds_cnt)) {
2076
2077         rrd_set_error("rrd_write cdp_prep to rrd");
2078         return -1;
2079     }
2080
2081     if (rrd_write(rrd_file, rrd->rra_ptr,
2082                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2083         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2084         rrd_set_error("rrd_write rra_ptr to rrd");
2085         return -1;
2086     }
2087     return 0;
2088 }
2089 #endif