fixed 2. x-grid example ... since the lable is valid for the whole day, it must be...
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.99907080300  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
16
17 #include <locale.h>
18
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
21
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
24
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
28  * replacement.
29  */
30 #include <sys/timeb.h>
31
32 #ifndef __MINGW32__
33 struct timeval {
34     time_t    tv_sec;   /* seconds */
35     long      tv_usec;  /* microseconds */
36 };
37 #endif
38
39 struct __timezone {
40     int       tz_minuteswest;   /* minutes W of Greenwich */
41     int       tz_dsttime;   /* type of dst correction */
42 };
43
44 static int gettimeofday(
45     struct timeval *t,
46     struct __timezone *tz)
47 {
48
49     struct _timeb current_time;
50
51     _ftime(&current_time);
52
53     t->tv_sec = current_time.time;
54     t->tv_usec = current_time.millitm * 1000;
55
56     return 0;
57 }
58
59 #endif
60
61 /* FUNCTION PROTOTYPES */
62
63 int       rrd_update_r(
64     const char *filename,
65     const char *tmplt,
66     int argc,
67     const char **argv);
68 int       _rrd_update(
69     const char *filename,
70     const char *tmplt,
71     int argc,
72     const char **argv,
73     info_t *);
74
75 static int allocate_data_structures(
76     rrd_t *rrd,
77     char ***updvals,
78     rrd_value_t **pdp_temp,
79     const char *tmplt,
80     long **tmpl_idx,
81     unsigned long *tmpl_cnt,
82     unsigned long **rra_step_cnt,
83     unsigned long **skip_update,
84     rrd_value_t **pdp_new);
85
86 static int parse_template(
87     rrd_t *rrd,
88     const char *tmplt,
89     unsigned long *tmpl_cnt,
90     long *tmpl_idx);
91
92 static int process_arg(
93     char *step_start,
94     rrd_t *rrd,
95     rrd_file_t *rrd_file,
96     unsigned long rra_begin,
97     unsigned long *rra_current,
98     time_t *current_time,
99     unsigned long *current_time_usec,
100     rrd_value_t *pdp_temp,
101     rrd_value_t *pdp_new,
102     unsigned long *rra_step_cnt,
103     char **updvals,
104     long *tmpl_idx,
105     unsigned long tmpl_cnt,
106     info_t **pcdp_summary,
107     int version,
108     unsigned long *skip_update,
109     int *schedule_smooth);
110
111 static int parse_ds(
112     rrd_t *rrd,
113     char **updvals,
114     long *tmpl_idx,
115     char *input,
116     unsigned long tmpl_cnt,
117     time_t *current_time,
118     unsigned long *current_time_usec,
119     int version);
120
121 static int get_time_from_reading(
122     rrd_t *rrd,
123     char timesyntax,
124     char **updvals,
125     time_t *current_time,
126     unsigned long *current_time_usec,
127     int version);
128
129 static int update_pdp_prep(
130     rrd_t *rrd,
131     char **updvals,
132     rrd_value_t *pdp_new,
133     double interval);
134
135 static int calculate_elapsed_steps(
136     rrd_t *rrd,
137     unsigned long current_time,
138     unsigned long current_time_usec,
139     double interval,
140     double *pre_int,
141     double *post_int,
142     unsigned long *proc_pdp_cnt);
143
144 static void simple_update(
145     rrd_t *rrd,
146     double interval,
147     rrd_value_t *pdp_new);
148
149 static int process_all_pdp_st(
150     rrd_t *rrd,
151     double interval,
152     double pre_int,
153     double post_int,
154     unsigned long elapsed_pdp_st,
155     rrd_value_t *pdp_new,
156     rrd_value_t *pdp_temp);
157
158 static int process_pdp_st(
159     rrd_t *rrd,
160     unsigned long ds_idx,
161     double interval,
162     double pre_int,
163     double post_int,
164     long diff_pdp_st,
165     rrd_value_t *pdp_new,
166     rrd_value_t *pdp_temp);
167
168 static int update_all_cdp_prep(
169     rrd_t *rrd,
170     unsigned long *rra_step_cnt,
171     unsigned long rra_begin,
172     rrd_file_t *rrd_file,
173     unsigned long elapsed_pdp_st,
174     unsigned long proc_pdp_cnt,
175     rrd_value_t **last_seasonal_coef,
176     rrd_value_t **seasonal_coef,
177     rrd_value_t *pdp_temp,
178     unsigned long *rra_current,
179     unsigned long *skip_update,
180     int *schedule_smooth);
181
182 static int do_schedule_smooth(
183     rrd_t *rrd,
184     unsigned long rra_idx,
185     unsigned long elapsed_pdp_st);
186
187 static int update_cdp_prep(
188     rrd_t *rrd,
189     unsigned long elapsed_pdp_st,
190     unsigned long start_pdp_offset,
191     unsigned long *rra_step_cnt,
192     int rra_idx,
193     rrd_value_t *pdp_temp,
194     rrd_value_t *last_seasonal_coef,
195     rrd_value_t *seasonal_coef,
196     int current_cf);
197
198 static void update_cdp(
199     unival *scratch,
200     int current_cf,
201     rrd_value_t pdp_temp_val,
202     unsigned long rra_step_cnt,
203     unsigned long elapsed_pdp_st,
204     unsigned long start_pdp_offset,
205     unsigned long pdp_cnt,
206     rrd_value_t xff,
207     int i,
208     int ii);
209
210 static void initialize_cdp_val(
211     unival *scratch,
212     int current_cf,
213     rrd_value_t pdp_temp_val,
214     unsigned long elapsed_pdp_st,
215     unsigned long start_pdp_offset,
216     unsigned long pdp_cnt);
217
218 static void reset_cdp(
219     rrd_t *rrd,
220     unsigned long elapsed_pdp_st,
221     rrd_value_t *pdp_temp,
222     rrd_value_t *last_seasonal_coef,
223     rrd_value_t *seasonal_coef,
224     int rra_idx,
225     int ds_idx,
226     int cdp_idx,
227     enum cf_en current_cf);
228
229 static rrd_value_t initialize_average_carry_over(
230     rrd_value_t pdp_temp_val,
231     unsigned long elapsed_pdp_st,
232     unsigned long start_pdp_offset,
233     unsigned long pdp_cnt);
234
235 static rrd_value_t calculate_cdp_val(
236     rrd_value_t cdp_val,
237     rrd_value_t pdp_temp_val,
238     unsigned long elapsed_pdp_st,
239     int current_cf,
240     int i,
241     int ii);
242
243 static int update_aberrant_cdps(
244     rrd_t *rrd,
245     rrd_file_t *rrd_file,
246     unsigned long rra_begin,
247     unsigned long *rra_current,
248     unsigned long elapsed_pdp_st,
249     rrd_value_t *pdp_temp,
250     rrd_value_t **seasonal_coef);
251
252 static int write_to_rras(
253     rrd_t *rrd,
254     rrd_file_t *rrd_file,
255     unsigned long *rra_step_cnt,
256     unsigned long rra_begin,
257     unsigned long *rra_current,
258     time_t current_time,
259     unsigned long *skip_update,
260     info_t **pcdp_summary);
261
262 static int write_RRA_row(
263     rrd_file_t *rrd_file,
264     rrd_t *rrd,
265     unsigned long rra_idx,
266     unsigned long *rra_current,
267     unsigned short CDP_scratch_idx,
268     info_t **pcdp_summary,
269     time_t rra_time);
270
271 static int smooth_all_rras(
272     rrd_t *rrd,
273     rrd_file_t *rrd_file,
274     unsigned long rra_begin);
275
276 #ifndef HAVE_MMAP
277 static int write_changes_to_disk(
278     rrd_t *rrd,
279     rrd_file_t *rrd_file,
280     int version);
281 #endif
282
283 /*
284  * normalize time as returned by gettimeofday. usec part must
285  * be always >= 0
286  */
287 static inline void normalize_time(
288     struct timeval *t)
289 {
290     if (t->tv_usec < 0) {
291         t->tv_sec--;
292         t->tv_usec += 1e6L;
293     }
294 }
295
296 /*
297  * Sets current_time and current_time_usec based on the current time.
298  * current_time_usec is set to 0 if the version number is 1 or 2.
299  */
300 static inline void initialize_time(
301     time_t *current_time,
302     unsigned long *current_time_usec,
303     int version)
304 {
305     struct timeval tmp_time;    /* used for time conversion */
306
307     gettimeofday(&tmp_time, 0);
308     normalize_time(&tmp_time);
309     *current_time = tmp_time.tv_sec;
310     if (version >= 3) {
311         *current_time_usec = tmp_time.tv_usec;
312     } else {
313         *current_time_usec = 0;
314     }
315 }
316
317 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
318
319 info_t   *rrd_update_v(
320     int argc,
321     char **argv)
322 {
323     char     *tmplt = NULL;
324     info_t   *result = NULL;
325     infoval   rc;
326     struct option long_options[] = {
327         {"template", required_argument, 0, 't'},
328         {0, 0, 0, 0}
329     };
330
331     rc.u_int = -1;
332     optind = 0;
333     opterr = 0;         /* initialize getopt */
334
335     while (1) {
336         int       option_index = 0;
337         int       opt;
338
339         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
340
341         if (opt == EOF)
342             break;
343
344         switch (opt) {
345         case 't':
346             tmplt = optarg;
347             break;
348
349         case '?':
350             rrd_set_error("unknown option '%s'", argv[optind - 1]);
351             goto end_tag;
352         }
353     }
354
355     /* need at least 2 arguments: filename, data. */
356     if (argc - optind < 2) {
357         rrd_set_error("Not enough arguments");
358         goto end_tag;
359     }
360     rc.u_int = 0;
361     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
362     rc.u_int = _rrd_update(argv[optind], tmplt,
363                            argc - optind - 1,
364                            (const char **) (argv + optind + 1), result);
365     result->value.u_int = rc.u_int;
366   end_tag:
367     return result;
368 }
369
370 int rrd_update(
371     int argc,
372     char **argv)
373 {
374     struct option long_options[] = {
375         {"template", required_argument, 0, 't'},
376         {0, 0, 0, 0}
377     };
378     int       option_index = 0;
379     int       opt;
380     char     *tmplt = NULL;
381     int       rc = -1;
382
383     optind = 0;
384     opterr = 0;         /* initialize getopt */
385
386     while (1) {
387         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
388
389         if (opt == EOF)
390             break;
391
392         switch (opt) {
393         case 't':
394             tmplt = strdup(optarg);
395             break;
396
397         case '?':
398             rrd_set_error("unknown option '%s'", argv[optind - 1]);
399             goto out;
400         }
401     }
402
403     /* need at least 2 arguments: filename, data. */
404     if (argc - optind < 2) {
405         rrd_set_error("Not enough arguments");
406         goto out;
407     }
408
409     rc = rrd_update_r(argv[optind], tmplt,
410                       argc - optind - 1, (const char **) (argv + optind + 1));
411   out:
412     free(tmplt);
413     return rc;
414 }
415
416 int rrd_update_r(
417     const char *filename,
418     const char *tmplt,
419     int argc,
420     const char **argv)
421 {
422     return _rrd_update(filename, tmplt, argc, argv, NULL);
423 }
424
425 int _rrd_update(
426     const char *filename,
427     const char *tmplt,
428     int argc,
429     const char **argv,
430     info_t *pcdp_summary)
431 {
432
433     int       arg_i = 2;
434
435     unsigned long rra_begin;    /* byte pointer to the rra
436                                  * area in the rrd file.  this
437                                  * pointer never changes value */
438     unsigned long rra_current;  /* byte pointer to the current write
439                                  * spot in the rrd file. */
440     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
441                              * to the existing entry */
442     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
443                              * to the cdp values */
444
445     long     *tmpl_idx; /* index representing the settings
446                          * transported by the tmplt index */
447     unsigned long tmpl_cnt = 2; /* time and data */
448     rrd_t     rrd;
449     time_t    current_time = 0;
450     unsigned long current_time_usec = 0;    /* microseconds part of current time */
451     char    **updvals;
452     int       schedule_smooth = 0;
453
454     /* number of elapsed PDP steps since last update */
455     unsigned long *rra_step_cnt = NULL;
456
457     int       version;  /* rrd version */
458     rrd_file_t *rrd_file;
459     char     *arg_copy; /* for processing the argv */
460     unsigned long *skip_update; /* RRAs to advance but not write */
461
462     /* need at least 1 arguments: data. */
463     if (argc < 1) {
464         rrd_set_error("Not enough arguments");
465         goto err_out;
466     }
467
468     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
469         goto err_free;
470     }
471     /* We are now at the beginning of the rra's */
472     rra_current = rra_begin = rrd_file->header_len;
473
474     version = atoi(rrd.stat_head->version);
475
476     initialize_time(&current_time, &current_time_usec, version);
477
478     /* get exclusive lock to whole file.
479      * lock gets removed when we close the file.
480      */
481     if (LockRRD(rrd_file->fd) != 0) {
482         rrd_set_error("could not lock RRD");
483         goto err_close;
484     }
485
486     if (allocate_data_structures(&rrd, &updvals,
487                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
488                                  &rra_step_cnt, &skip_update,
489                                  &pdp_new) == -1) {
490         goto err_close;
491     }
492
493     /* loop through the arguments. */
494     for (arg_i = 0; arg_i < argc; arg_i++) {
495         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
496             rrd_set_error("failed duplication argv entry");
497             break;
498         }
499         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
500                         &current_time, &current_time_usec, pdp_temp, pdp_new,
501                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
502                         &pcdp_summary, version, skip_update,
503                         &schedule_smooth) == -1) {
504             free(arg_copy);
505             break;
506         }
507         free(arg_copy);
508     }
509
510     free(rra_step_cnt);
511
512     /* if we got here and if there is an error and if the file has not been
513      * written to, then close things up and return. */
514     if (rrd_test_error()) {
515         goto err_free_structures;
516     }
517 #ifndef HAVE_MMAP
518     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
519         goto err_free_structures;
520     }
521 #endif
522
523     /* calling the smoothing code here guarantees at most one smoothing
524      * operation per rrd_update call. Unfortunately, it is possible with bulk
525      * updates, or a long-delayed update for smoothing to occur off-schedule.
526      * This really isn't critical except during the burn-in cycles. */
527     if (schedule_smooth) {
528         smooth_all_rras(&rrd, rrd_file, rra_begin);
529     }
530
531 /*    rrd_dontneed(rrd_file,&rrd); */
532     rrd_free(&rrd);
533     rrd_close(rrd_file);
534
535     free(pdp_new);
536     free(tmpl_idx);
537     free(pdp_temp);
538     free(skip_update);
539     free(updvals);
540     return 0;
541
542   err_free_structures:
543     free(pdp_new);
544     free(tmpl_idx);
545     free(pdp_temp);
546     free(skip_update);
547     free(updvals);
548   err_close:
549     rrd_close(rrd_file);
550   err_free:
551     rrd_free(&rrd);
552   err_out:
553     return -1;
554 }
555
556 /*
557  * get exclusive lock to whole file.
558  * lock gets removed when we close the file
559  *
560  * returns 0 on success
561  */
562 int LockRRD(
563     int in_file)
564 {
565     int       rcstat;
566
567     {
568 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
569         struct _stat st;
570
571         if (_fstat(in_file, &st) == 0) {
572             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
573         } else {
574             rcstat = -1;
575         }
576 #else
577         struct flock lock;
578
579         lock.l_type = F_WRLCK;  /* exclusive write lock */
580         lock.l_len = 0; /* whole file */
581         lock.l_start = 0;   /* start of file */
582         lock.l_whence = SEEK_SET;   /* end of file */
583
584         rcstat = fcntl(in_file, F_SETLK, &lock);
585 #endif
586     }
587
588     return (rcstat);
589 }
590
591 /*
592  * Allocate some important arrays used, and initialize the template.
593  *
594  * When it returns, either all of the structures are allocated
595  * or none of them are.
596  *
597  * Returns 0 on success, -1 on error.
598  */
599 static int allocate_data_structures(
600     rrd_t *rrd,
601     char ***updvals,
602     rrd_value_t **pdp_temp,
603     const char *tmplt,
604     long **tmpl_idx,
605     unsigned long *tmpl_cnt,
606     unsigned long **rra_step_cnt,
607     unsigned long **skip_update,
608     rrd_value_t **pdp_new)
609 {
610     unsigned  i, ii;
611     if ((*updvals = (char **) malloc(sizeof(char *)
612                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
613         rrd_set_error("allocating updvals pointer array.");
614         return -1;
615     }
616     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
617                                             * rrd->stat_head->ds_cnt)) ==
618         NULL) {
619         rrd_set_error("allocating pdp_temp.");
620         goto err_free_updvals;
621     }
622     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
623                                                  *
624                                                  rrd->stat_head->rra_cnt)) ==
625         NULL) {
626         rrd_set_error("allocating skip_update.");
627         goto err_free_pdp_temp;
628     }
629     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
630                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
631         rrd_set_error("allocating tmpl_idx.");
632         goto err_free_skip_update;
633     }
634     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
635                                                   *
636                                                   (rrd->stat_head->
637                                                    rra_cnt))) == NULL) {
638         rrd_set_error("allocating rra_step_cnt.");
639         goto err_free_tmpl_idx;
640     }
641
642     /* initialize tmplt redirector */
643     /* default config example (assume DS 1 is a CDEF DS)
644        tmpl_idx[0] -> 0; (time)
645        tmpl_idx[1] -> 1; (DS 0)
646        tmpl_idx[2] -> 3; (DS 2)
647        tmpl_idx[3] -> 4; (DS 3) */
648     (*tmpl_idx)[0] = 0; /* time */
649     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
650         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
651             (*tmpl_idx)[ii++] = i;
652     }
653     *tmpl_cnt = ii;
654
655     if (tmplt != NULL) {
656         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
657             goto err_free_rra_step_cnt;
658         }
659     }
660
661     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
662                                            * rrd->stat_head->ds_cnt)) == NULL) {
663         rrd_set_error("allocating pdp_new.");
664         goto err_free_rra_step_cnt;
665     }
666
667     return 0;
668
669   err_free_rra_step_cnt:
670     free(*rra_step_cnt);
671   err_free_tmpl_idx:
672     free(*tmpl_idx);
673   err_free_skip_update:
674     free(*skip_update);
675   err_free_pdp_temp:
676     free(*pdp_temp);
677   err_free_updvals:
678     free(*updvals);
679     return -1;
680 }
681
682 /*
683  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
684  *
685  * Returns 0 on success.
686  */
687 static int parse_template(
688     rrd_t *rrd,
689     const char *tmplt,
690     unsigned long *tmpl_cnt,
691     long *tmpl_idx)
692 {
693     char     *dsname, *tmplt_copy;
694     unsigned int tmpl_len, i;
695     int       ret = 0;
696
697     *tmpl_cnt = 1;      /* the first entry is the time */
698
699     /* we should work on a writeable copy here */
700     if ((tmplt_copy = strdup(tmplt)) == NULL) {
701         rrd_set_error("error copying tmplt '%s'", tmplt);
702         ret = -1;
703         goto out;
704     }
705
706     dsname = tmplt_copy;
707     tmpl_len = strlen(tmplt_copy);
708     for (i = 0; i <= tmpl_len; i++) {
709         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
710             tmplt_copy[i] = '\0';
711             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
712                 rrd_set_error("tmplt contains more DS definitions than RRD");
713                 ret = -1;
714                 goto out_free_tmpl_copy;
715             }
716             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
717                 rrd_set_error("unknown DS name '%s'", dsname);
718                 ret = -1;
719                 goto out_free_tmpl_copy;
720             }
721             /* go to the next entry on the tmplt_copy */
722             if (i < tmpl_len)
723                 dsname = &tmplt_copy[i + 1];
724         }
725     }
726   out_free_tmpl_copy:
727     free(tmplt_copy);
728   out:
729     return ret;
730 }
731
732 /*
733  * Parse an update string, updates the primary data points (PDPs)
734  * and consolidated data points (CDPs), and writes changes to the RRAs.
735  *
736  * Returns 0 on success, -1 on error.
737  */
738 static int process_arg(
739     char *step_start,
740     rrd_t *rrd,
741     rrd_file_t *rrd_file,
742     unsigned long rra_begin,
743     unsigned long *rra_current,
744     time_t *current_time,
745     unsigned long *current_time_usec,
746     rrd_value_t *pdp_temp,
747     rrd_value_t *pdp_new,
748     unsigned long *rra_step_cnt,
749     char **updvals,
750     long *tmpl_idx,
751     unsigned long tmpl_cnt,
752     info_t **pcdp_summary,
753     int version,
754     unsigned long *skip_update,
755     int *schedule_smooth)
756 {
757     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
758
759     /* a vector of future Holt-Winters seasonal coefs */
760     unsigned long elapsed_pdp_st;
761
762     double    interval, pre_int, post_int;  /* interval between this and
763                                              * the last run */
764     unsigned long proc_pdp_cnt;
765     unsigned long rra_start;
766
767     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
768                  current_time, current_time_usec, version) == -1) {
769         return -1;
770     }
771     /* seek to the beginning of the rra's */
772     if (*rra_current != rra_begin) {
773 #ifndef HAVE_MMAP
774         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
775             rrd_set_error("seek error in rrd");
776             return -1;
777         }
778 #endif
779         *rra_current = rra_begin;
780     }
781     rra_start = rra_begin;
782
783     interval = (double) (*current_time - rrd->live_head->last_up)
784         + (double) ((long) *current_time_usec -
785                     (long) rrd->live_head->last_up_usec) / 1e6f;
786
787     /* process the data sources and update the pdp_prep 
788      * area accordingly */
789     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
790         return -1;
791     }
792
793     elapsed_pdp_st = calculate_elapsed_steps(rrd,
794                                              *current_time,
795                                              *current_time_usec, interval,
796                                              &pre_int, &post_int,
797                                              &proc_pdp_cnt);
798
799     /* has a pdp_st moment occurred since the last run ? */
800     if (elapsed_pdp_st == 0) {
801         /* no we have not passed a pdp_st moment. therefore update is simple */
802         simple_update(rrd, interval, pdp_new);
803     } else {
804         /* an pdp_st has occurred. */
805         if (process_all_pdp_st(rrd, interval,
806                                pre_int, post_int,
807                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
808             return -1;
809         }
810         if (update_all_cdp_prep(rrd, rra_step_cnt,
811                                 rra_begin, rrd_file,
812                                 elapsed_pdp_st,
813                                 proc_pdp_cnt,
814                                 &last_seasonal_coef,
815                                 &seasonal_coef,
816                                 pdp_temp, rra_current,
817                                 skip_update, schedule_smooth) == -1) {
818             goto err_free_coefficients;
819         }
820         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
821                                  elapsed_pdp_st, pdp_temp,
822                                  &seasonal_coef) == -1) {
823             goto err_free_coefficients;
824         }
825         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
826                           rra_current, *current_time, skip_update,
827                           pcdp_summary) == -1) {
828             goto err_free_coefficients;
829         }
830     }                   /* endif a pdp_st has occurred */
831     rrd->live_head->last_up = *current_time;
832     rrd->live_head->last_up_usec = *current_time_usec;
833
834     free(seasonal_coef);
835     free(last_seasonal_coef);
836     return 0;
837
838   err_free_coefficients:
839     free(seasonal_coef);
840     free(last_seasonal_coef);
841     return -1;
842 }
843
844 /*
845  * Parse a DS string (time + colon-separated values), storing the
846  * results in current_time, current_time_usec, and updvals.
847  *
848  * Returns 0 on success, -1 on error.
849  */
850 static int parse_ds(
851     rrd_t *rrd,
852     char **updvals,
853     long *tmpl_idx,
854     char *input,
855     unsigned long tmpl_cnt,
856     time_t *current_time,
857     unsigned long *current_time_usec,
858     int version)
859 {
860     char     *p;
861     unsigned long i;
862     char      timesyntax;
863
864     updvals[0] = input;
865     /* initialize all ds input to unknown except the first one
866        which has always got to be set */
867     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
868         updvals[i] = "U";
869
870     /* separate all ds elements; first must be examined separately
871        due to alternate time syntax */
872     if ((p = strchr(input, '@')) != NULL) {
873         timesyntax = '@';
874     } else if ((p = strchr(input, ':')) != NULL) {
875         timesyntax = ':';
876     } else {
877         rrd_set_error("expected timestamp not found in data source from %s",
878                       input);
879         return -1;
880     }
881     *p = '\0';
882     i = 1;
883     updvals[tmpl_idx[i++]] = p + 1;
884     while (*(++p)) {
885         if (*p == ':') {
886             *p = '\0';
887             if (i < tmpl_cnt) {
888                 updvals[tmpl_idx[i++]] = p + 1;
889             }
890         }
891     }
892
893     if (i != tmpl_cnt) {
894         rrd_set_error("expected %lu data source readings (got %lu) from %s",
895                       tmpl_cnt - 1, i, input);
896         return -1;
897     }
898
899     if (get_time_from_reading(rrd, timesyntax, updvals,
900                               current_time, current_time_usec,
901                               version) == -1) {
902         return -1;
903     }
904     return 0;
905 }
906
907 /*
908  * Parse the time in a DS string, store it in current_time and 
909  * current_time_usec and verify that it's later than the last
910  * update for this DS.
911  *
912  * Returns 0 on success, -1 on error.
913  */
914 static int get_time_from_reading(
915     rrd_t *rrd,
916     char timesyntax,
917     char **updvals,
918     time_t *current_time,
919     unsigned long *current_time_usec,
920     int version)
921 {
922     double    tmp;
923     char     *parsetime_error = NULL;
924     char     *old_locale;
925     struct rrd_time_value ds_tv;
926     struct timeval tmp_time;    /* used for time conversion */
927
928     /* get the time from the reading ... handle N */
929     if (timesyntax == '@') {    /* at-style */
930         if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
931             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
932             return -1;
933         }
934         if (ds_tv.type == RELATIVE_TO_END_TIME ||
935             ds_tv.type == RELATIVE_TO_START_TIME) {
936             rrd_set_error("specifying time relative to the 'start' "
937                           "or 'end' makes no sense here: %s", updvals[0]);
938             return -1;
939         }
940         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
941         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
942     } else if (strcmp(updvals[0], "N") == 0) {
943         gettimeofday(&tmp_time, 0);
944         normalize_time(&tmp_time);
945         *current_time = tmp_time.tv_sec;
946         *current_time_usec = tmp_time.tv_usec;
947     } else {
948         old_locale = setlocale(LC_NUMERIC, "C");
949         tmp = strtod(updvals[0], 0);
950         setlocale(LC_NUMERIC, old_locale);
951         *current_time = floor(tmp);
952         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
953     }
954     /* dont do any correction for old version RRDs */
955     if (version < 3)
956         *current_time_usec = 0;
957
958     if (*current_time < rrd->live_head->last_up ||
959         (*current_time == rrd->live_head->last_up &&
960          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
961         rrd_set_error("illegal attempt to update using time %ld when "
962                       "last update time is %ld (minimum one second step)",
963                       *current_time, rrd->live_head->last_up);
964         return -1;
965     }
966     return 0;
967 }
968
969 /*
970  * Update pdp_new by interpreting the updvals according to the DS type
971  * (COUNTER, GAUGE, etc.).
972  *
973  * Returns 0 on success, -1 on error.
974  */
975 static int update_pdp_prep(
976     rrd_t *rrd,
977     char **updvals,
978     rrd_value_t *pdp_new,
979     double interval)
980 {
981     unsigned long ds_idx;
982     int       ii;
983     char     *endptr;   /* used in the conversion */
984     double    rate;
985     char     *old_locale;
986     enum dst_en dst_idx;
987
988     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
989         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
990
991         /* make sure we do not build diffs with old last_ds values */
992         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
993             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
994             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
995         }
996
997         /* NOTE: DST_CDEF should never enter this if block, because
998          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
999          * accidently specified a value for the DST_CDEF. To handle this case,
1000          * an extra check is required. */
1001
1002         if ((updvals[ds_idx + 1][0] != 'U') &&
1003             (dst_idx != DST_CDEF) &&
1004             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1005             rate = DNAN;
1006
1007             /* pdp_new contains rate * time ... eg the bytes transferred during
1008              * the interval. Doing it this way saves a lot of math operations
1009              */
1010             switch (dst_idx) {
1011             case DST_COUNTER:
1012             case DST_DERIVE:
1013                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1014                     if ((updvals[ds_idx + 1][ii] < '0'
1015                          || updvals[ds_idx + 1][ii] > '9')
1016                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1017                         rrd_set_error("not a simple integer: '%s'",
1018                                       updvals[ds_idx + 1]);
1019                         return -1;
1020                     }
1021                 }
1022                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1023                     pdp_new[ds_idx] =
1024                         rrd_diff(updvals[ds_idx + 1],
1025                                  rrd->pdp_prep[ds_idx].last_ds);
1026                     if (dst_idx == DST_COUNTER) {
1027                         /* simple overflow catcher. This will fail
1028                          * terribly for non 32 or 64 bit counters
1029                          * ... are there any others in SNMP land?
1030                          */
1031                         if (pdp_new[ds_idx] < (double) 0.0)
1032                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1033                         if (pdp_new[ds_idx] < (double) 0.0)
1034                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1035                     }
1036                     rate = pdp_new[ds_idx] / interval;
1037                 } else {
1038                     pdp_new[ds_idx] = DNAN;
1039                 }
1040                 break;
1041             case DST_ABSOLUTE:
1042                 old_locale = setlocale(LC_NUMERIC, "C");
1043                 errno = 0;
1044                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1045                 setlocale(LC_NUMERIC, old_locale);
1046                 if (errno > 0) {
1047                     rrd_set_error("converting '%s' to float: %s",
1048                                   updvals[ds_idx + 1], rrd_strerror(errno));
1049                     return -1;
1050                 };
1051                 if (endptr[0] != '\0') {
1052                     rrd_set_error
1053                         ("conversion of '%s' to float not complete: tail '%s'",
1054                          updvals[ds_idx + 1], endptr);
1055                     return -1;
1056                 }
1057                 rate = pdp_new[ds_idx] / interval;
1058                 break;
1059             case DST_GAUGE:
1060                 errno = 0;
1061                 old_locale = setlocale(LC_NUMERIC, "C");
1062                 pdp_new[ds_idx] =
1063                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1064                 setlocale(LC_NUMERIC, old_locale);
1065                 if (errno) {
1066                     rrd_set_error("converting '%s' to float: %s",
1067                                   updvals[ds_idx + 1], rrd_strerror(errno));
1068                     return -1;
1069                 };
1070                 if (endptr[0] != '\0') {
1071                     rrd_set_error
1072                         ("conversion of '%s' to float not complete: tail '%s'",
1073                          updvals[ds_idx + 1], endptr);
1074                     return -1;
1075                 }
1076                 rate = pdp_new[ds_idx] / interval;
1077                 break;
1078             default:
1079                 rrd_set_error("rrd contains unknown DS type : '%s'",
1080                               rrd->ds_def[ds_idx].dst);
1081                 return -1;
1082             }
1083             /* break out of this for loop if the error string is set */
1084             if (rrd_test_error()) {
1085                 return -1;
1086             }
1087             /* make sure pdp_temp is neither too large or too small
1088              * if any of these occur it becomes unknown ...
1089              * sorry folks ... */
1090             if (!isnan(rate) &&
1091                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1092                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1093                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1094                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1095                 pdp_new[ds_idx] = DNAN;
1096             }
1097         } else {
1098             /* no news is news all the same */
1099             pdp_new[ds_idx] = DNAN;
1100         }
1101
1102
1103         /* make a copy of the command line argument for the next run */
1104 #ifdef DEBUG
1105         fprintf(stderr, "prep ds[%lu]\t"
1106                 "last_arg '%s'\t"
1107                 "this_arg '%s'\t"
1108                 "pdp_new %10.2f\n",
1109                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1110                 pdp_new[ds_idx]);
1111 #endif
1112         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1113                 LAST_DS_LEN - 1);
1114         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1115     }
1116     return 0;
1117 }
1118
1119 /*
1120  * How many PDP steps have elapsed since the last update? Returns the answer,
1121  * and stores the time between the last update and the last PDP in pre_time,
1122  * and the time between the last PDP and the current time in post_int.
1123  */
1124 static int calculate_elapsed_steps(
1125     rrd_t *rrd,
1126     unsigned long current_time,
1127     unsigned long current_time_usec,
1128     double interval,
1129     double *pre_int,
1130     double *post_int,
1131     unsigned long *proc_pdp_cnt)
1132 {
1133     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1134     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1135                                  * time */
1136     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1137                                  * when it was last updated */
1138     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1139
1140     /* when was the current pdp started */
1141     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1142     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1143
1144     /* when did the last pdp_st occur */
1145     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1146     occu_pdp_st = current_time - occu_pdp_age;
1147
1148     if (occu_pdp_st > proc_pdp_st) {
1149         /* OK we passed the pdp_st moment */
1150         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1151                                                                      * occurred before the latest
1152                                                                      * pdp_st moment*/
1153         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1154         *post_int = occu_pdp_age;   /* how much after it */
1155         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1156     } else {
1157         *pre_int = interval;
1158         *post_int = 0;
1159     }
1160
1161     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1162
1163 #ifdef DEBUG
1164     printf("proc_pdp_age %lu\t"
1165            "proc_pdp_st %lu\t"
1166            "occu_pfp_age %lu\t"
1167            "occu_pdp_st %lu\t"
1168            "int %lf\t"
1169            "pre_int %lf\t"
1170            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1171            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1172 #endif
1173
1174     /* compute the number of elapsed pdp_st moments */
1175     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1176 }
1177
1178 /*
1179  * Increment the PDP values by the values in pdp_new, or else initialize them.
1180  */
1181 static void simple_update(
1182     rrd_t *rrd,
1183     double interval,
1184     rrd_value_t *pdp_new)
1185 {
1186     int       i;
1187
1188     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1189         if (isnan(pdp_new[i])) {
1190             /* this is not really accurate if we use subsecond data arrival time
1191                should have thought of it when going subsecond resolution ...
1192                sorry next format change we will have it! */
1193             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1194                 floor(interval);
1195         } else {
1196             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1197                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1198             } else {
1199                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1200             }
1201         }
1202 #ifdef DEBUG
1203         fprintf(stderr,
1204                 "NO PDP  ds[%i]\t"
1205                 "value %10.2f\t"
1206                 "unkn_sec %5lu\n",
1207                 i,
1208                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1209                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1210 #endif
1211     }
1212 }
1213
1214 /*
1215  * Call process_pdp_st for each DS.
1216  *
1217  * Returns 0 on success, -1 on error.
1218  */
1219 static int process_all_pdp_st(
1220     rrd_t *rrd,
1221     double interval,
1222     double pre_int,
1223     double post_int,
1224     unsigned long elapsed_pdp_st,
1225     rrd_value_t *pdp_new,
1226     rrd_value_t *pdp_temp)
1227 {
1228     unsigned long ds_idx;
1229
1230     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1231        rate*seconds which occurred up to the last run.
1232        pdp_new[] contains rate*seconds from the latest run.
1233        pdp_temp[] will contain the rate for cdp */
1234
1235     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1236         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1237                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1238                            pdp_new, pdp_temp) == -1) {
1239             return -1;
1240         }
1241 #ifdef DEBUG
1242         fprintf(stderr, "PDP UPD ds[%lu]\t"
1243                 "pdp_temp %10.2f\t"
1244                 "new_prep %10.2f\t"
1245                 "new_unkn_sec %5lu\n",
1246                 ds_idx, pdp_temp[ds_idx],
1247                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1248                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1249 #endif
1250     }
1251     return 0;
1252 }
1253
1254 /*
1255  * Process an update that occurs after one of the PDP moments.
1256  * Increments the PDP value, sets NAN if time greater than the
1257  * heartbeats have elapsed, processes CDEFs.
1258  *
1259  * Returns 0 on success, -1 on error.
1260  */
1261 static int process_pdp_st(
1262     rrd_t *rrd,
1263     unsigned long ds_idx,
1264     double interval,
1265     double pre_int,
1266     double post_int,
1267     long diff_pdp_st,
1268     rrd_value_t *pdp_new,
1269     rrd_value_t *pdp_temp)
1270 {
1271     int       i;
1272
1273     /* update pdp_prep to the current pdp_st. */
1274     double    pre_unknown = 0.0;
1275     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1276     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1277
1278     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1279
1280     rpnstack_init(&rpnstack);
1281
1282
1283     if (isnan(pdp_new[ds_idx])) {
1284         /* a final bit of unknown to be added bevore calculation
1285            we use a temporary variable for this so that we
1286            don't have to turn integer lines before using the value */
1287         pre_unknown = pre_int;
1288     } else {
1289         if (isnan(scratch[PDP_val].u_val)) {
1290             scratch[PDP_val].u_val = 0;
1291         }
1292         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1293     }
1294
1295     /* if too much of the pdp_prep is unknown we dump it */
1296     /* if the interval is larger thatn mrhb we get NAN */
1297     if ((interval > mrhb) ||
1298         (diff_pdp_st <= (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1299         pdp_temp[ds_idx] = DNAN;
1300     } else {
1301         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1302             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1303              pre_unknown);
1304     }
1305
1306     /* process CDEF data sources; remember each CDEF DS can
1307      * only reference other DS with a lower index number */
1308     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1309         rpnp_t   *rpnp;
1310
1311         rpnp =
1312             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1313         /* substitute data values for OP_VARIABLE nodes */
1314         for (i = 0; rpnp[i].op != OP_END; i++) {
1315             if (rpnp[i].op == OP_VARIABLE) {
1316                 rpnp[i].op = OP_NUMBER;
1317                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1318             }
1319         }
1320         /* run the rpn calculator */
1321         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1322             free(rpnp);
1323             rpnstack_free(&rpnstack);
1324             return -1;
1325         }
1326     }
1327
1328     /* make pdp_prep ready for the next run */
1329     if (isnan(pdp_new[ds_idx])) {
1330         /* this is not realy accurate if we use subsecond data arival time
1331            should have thought of it when going subsecond resolution ...
1332            sorry next format change we will have it! */
1333         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1334         scratch[PDP_val].u_val = DNAN;
1335     } else {
1336         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1337         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1338     }
1339     rpnstack_free(&rpnstack);
1340     return 0;
1341 }
1342
1343 /*
1344  * Iterate over all the RRAs for a given DS and:
1345  * 1. Decide whether to schedule a smooth later
1346  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1347  * 3. Update the CDP
1348  *
1349  * Returns 0 on success, -1 on error
1350  */
1351 static int update_all_cdp_prep(
1352     rrd_t *rrd,
1353     unsigned long *rra_step_cnt,
1354     unsigned long rra_begin,
1355     rrd_file_t *rrd_file,
1356     unsigned long elapsed_pdp_st,
1357     unsigned long proc_pdp_cnt,
1358     rrd_value_t **last_seasonal_coef,
1359     rrd_value_t **seasonal_coef,
1360     rrd_value_t *pdp_temp,
1361     unsigned long *rra_current,
1362     unsigned long *skip_update,
1363     int *schedule_smooth)
1364 {
1365     unsigned long rra_idx;
1366
1367     /* index into the CDP scratch array */
1368     enum cf_en current_cf;
1369     unsigned long rra_start;
1370
1371     /* number of rows to be updated in an RRA for a data value. */
1372     unsigned long start_pdp_offset;
1373
1374     rra_start = rra_begin;
1375     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1376         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1377         start_pdp_offset =
1378             rrd->rra_def[rra_idx].pdp_cnt -
1379             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1380         skip_update[rra_idx] = 0;
1381         if (start_pdp_offset <= elapsed_pdp_st) {
1382             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1383                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1384         } else {
1385             rra_step_cnt[rra_idx] = 0;
1386         }
1387
1388         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1389             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1390              * so that they will be correct for the next observed value; note that for
1391              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1392              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1393             if (rra_step_cnt[rra_idx] > 1) {
1394                 skip_update[rra_idx] = 1;
1395                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1396                                 elapsed_pdp_st, last_seasonal_coef);
1397                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1398                                 elapsed_pdp_st + 1, seasonal_coef);
1399             }
1400             /* periodically run a smoother for seasonal effects */
1401             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1402 #ifdef DEBUG
1403                 fprintf(stderr,
1404                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1405                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1406                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1407                         u_cnt);
1408 #endif
1409                 *schedule_smooth = 1;
1410             }
1411             *rra_current = rrd_tell(rrd_file);
1412         }
1413         if (rrd_test_error())
1414             return -1;
1415
1416         if (update_cdp_prep
1417             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1418              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1419              current_cf) == -1) {
1420             return -1;
1421         }
1422         rra_start +=
1423             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1424             sizeof(rrd_value_t);
1425     }
1426     return 0;
1427 }
1428
1429 /* 
1430  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1431  */
1432 static int do_schedule_smooth(
1433     rrd_t *rrd,
1434     unsigned long rra_idx,
1435     unsigned long elapsed_pdp_st)
1436 {
1437     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1438     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1439     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1440     unsigned long seasonal_smooth_idx =
1441         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1442     unsigned long *init_seasonal =
1443         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1444
1445     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1446      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1447      * really an RRA level, not a data source within RRA level parameter, but
1448      * the rra_def is read only for rrd_update (not flushed to disk). */
1449     if (*init_seasonal > BURNIN_CYCLES) {
1450         /* someone has no doubt invented a trick to deal with this wrap around,
1451          * but at least this code is clear. */
1452         if (seasonal_smooth_idx > cur_row) {
1453             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1454              * between PDP and CDP */
1455             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1456         }
1457         /* can't rely on negative numbers because we are working with
1458          * unsigned values */
1459         return (cur_row + elapsed_pdp_st >= row_cnt
1460                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1461     }
1462     /* mark off one of the burn-in cycles */
1463     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1464 }
1465
1466 /*
1467  * For a given RRA, iterate over the data sources and call the appropriate
1468  * consolidation function.
1469  *
1470  * Returns 0 on success, -1 on error.
1471  */
1472 static int update_cdp_prep(
1473     rrd_t *rrd,
1474     unsigned long elapsed_pdp_st,
1475     unsigned long start_pdp_offset,
1476     unsigned long *rra_step_cnt,
1477     int rra_idx,
1478     rrd_value_t *pdp_temp,
1479     rrd_value_t *last_seasonal_coef,
1480     rrd_value_t *seasonal_coef,
1481     int current_cf)
1482 {
1483     unsigned long ds_idx, cdp_idx;
1484
1485     /* update CDP_PREP areas */
1486     /* loop over data soures within each RRA */
1487     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1488
1489         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1490
1491         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1492             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1493                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1494                        elapsed_pdp_st, start_pdp_offset,
1495                        rrd->rra_def[rra_idx].pdp_cnt,
1496                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1497                        rra_idx, ds_idx);
1498         } else {
1499             /* Nothing to consolidate if there's one PDP per CDP. However, if
1500              * we've missed some PDPs, let's update null counters etc. */
1501             if (elapsed_pdp_st > 2) {
1502                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1503                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1504                           current_cf);
1505             }
1506         }
1507
1508         if (rrd_test_error())
1509             return -1;
1510     }                   /* endif data sources loop */
1511     return 0;
1512 }
1513
1514 /*
1515  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1516  * primary value, secondary value, and # of unknowns.
1517  */
1518 static void update_cdp(
1519     unival *scratch,
1520     int current_cf,
1521     rrd_value_t pdp_temp_val,
1522     unsigned long rra_step_cnt,
1523     unsigned long elapsed_pdp_st,
1524     unsigned long start_pdp_offset,
1525     unsigned long pdp_cnt,
1526     rrd_value_t xff,
1527     int i,
1528     int ii)
1529 {
1530     /* shorthand variables */
1531     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1532     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1533     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1534     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1535
1536     if (rra_step_cnt) {
1537         /* If we are in this block, as least 1 CDP value will be written to
1538          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1539          * to be written, then the "fill in" value is the CDP_secondary_val
1540          * entry. */
1541         if (isnan(pdp_temp_val)) {
1542             *cdp_unkn_pdp_cnt += start_pdp_offset;
1543             *cdp_secondary_val = DNAN;
1544         } else {
1545             /* CDP_secondary value is the RRA "fill in" value for intermediary
1546              * CDP data entries. No matter the CF, the value is the same because
1547              * the average, max, min, and last of a list of identical values is
1548              * the same, namely, the value itself. */
1549             *cdp_secondary_val = pdp_temp_val;
1550         }
1551
1552         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1553             *cdp_primary_val = DNAN;
1554             if (current_cf == CF_AVERAGE) {
1555                 *cdp_val =
1556                     initialize_average_carry_over(pdp_temp_val,
1557                                                   elapsed_pdp_st,
1558                                                   start_pdp_offset, pdp_cnt);
1559             } else {
1560                 *cdp_val = pdp_temp_val;
1561             }
1562         } else {
1563             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1564                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1565         }               /* endif meets xff value requirement for a valid value */
1566         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1567          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1568         if (isnan(pdp_temp_val))
1569             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1570         else
1571             *cdp_unkn_pdp_cnt = 0;
1572     } else {            /* rra_step_cnt[i]  == 0 */
1573
1574 #ifdef DEBUG
1575         if (isnan(*cdp_val)) {
1576             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1577                     i, ii);
1578         } else {
1579             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1580                     i, ii, *cdp_val);
1581         }
1582 #endif
1583         if (isnan(pdp_temp_val)) {
1584             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1585         } else {
1586             *cdp_val =
1587                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1588                                   current_cf, i, ii);
1589         }
1590     }
1591 }
1592
1593 /*
1594  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1595  * on the type of consolidation function.
1596  */
1597 static void initialize_cdp_val(
1598     unival *scratch,
1599     int current_cf,
1600     rrd_value_t pdp_temp_val,
1601     unsigned long elapsed_pdp_st,
1602     unsigned long start_pdp_offset,
1603     unsigned long pdp_cnt)
1604 {
1605     rrd_value_t cum_val, cur_val;
1606
1607     switch (current_cf) {
1608     case CF_AVERAGE:
1609         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1610         cur_val = IFDNAN(pdp_temp_val, 0.0);
1611         scratch[CDP_primary_val].u_val =
1612             (cum_val + cur_val * start_pdp_offset) /
1613             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1614         scratch[CDP_val].u_val =
1615             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1616                                           start_pdp_offset, pdp_cnt);
1617         break;
1618     case CF_MAXIMUM:
1619         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1620         cur_val = IFDNAN(pdp_temp_val, -DINF);
1621 #if 0
1622 #ifdef DEBUG
1623         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1624             fprintf(stderr,
1625                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1626                     i, ii);
1627             exit(-1);
1628         }
1629 #endif
1630 #endif
1631         if (cur_val > cum_val)
1632             scratch[CDP_primary_val].u_val = cur_val;
1633         else
1634             scratch[CDP_primary_val].u_val = cum_val;
1635         /* initialize carry over value */
1636         scratch[CDP_val].u_val = pdp_temp_val;
1637         break;
1638     case CF_MINIMUM:
1639         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1640         cur_val = IFDNAN(pdp_temp_val, DINF);
1641 #if 0
1642 #ifdef DEBUG
1643         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1644             fprintf(stderr,
1645                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1646                     ii);
1647             exit(-1);
1648         }
1649 #endif
1650 #endif
1651         if (cur_val < cum_val)
1652             scratch[CDP_primary_val].u_val = cur_val;
1653         else
1654             scratch[CDP_primary_val].u_val = cum_val;
1655         /* initialize carry over value */
1656         scratch[CDP_val].u_val = pdp_temp_val;
1657         break;
1658     case CF_LAST:
1659     default:
1660         scratch[CDP_primary_val].u_val = pdp_temp_val;
1661         /* initialize carry over value */
1662         scratch[CDP_val].u_val = pdp_temp_val;
1663         break;
1664     }
1665 }
1666
1667 /*
1668  * Update the consolidation function for Holt-Winters functions as
1669  * well as other functions that don't actually consolidate multiple
1670  * PDPs.
1671  */
1672 static void reset_cdp(
1673     rrd_t *rrd,
1674     unsigned long elapsed_pdp_st,
1675     rrd_value_t *pdp_temp,
1676     rrd_value_t *last_seasonal_coef,
1677     rrd_value_t *seasonal_coef,
1678     int rra_idx,
1679     int ds_idx,
1680     int cdp_idx,
1681     enum cf_en current_cf)
1682 {
1683     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1684
1685     switch (current_cf) {
1686     case CF_AVERAGE:
1687     default:
1688         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1689         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1690         break;
1691     case CF_SEASONAL:
1692     case CF_DEVSEASONAL:
1693         /* need to update cached seasonal values, so they are consistent
1694          * with the bulk update */
1695         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1696          * CDP_last_deviation are the same. */
1697         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1698         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1699         break;
1700     case CF_HWPREDICT:
1701     case CF_MHWPREDICT:
1702         /* need to update the null_count and last_null_count.
1703          * even do this for non-DNAN pdp_temp because the
1704          * algorithm is not learning from batch updates. */
1705         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1706         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1707         /* fall through */
1708     case CF_DEVPREDICT:
1709         scratch[CDP_primary_val].u_val = DNAN;
1710         scratch[CDP_secondary_val].u_val = DNAN;
1711         break;
1712     case CF_FAILURES:
1713         /* do not count missed bulk values as failures */
1714         scratch[CDP_primary_val].u_val = 0;
1715         scratch[CDP_secondary_val].u_val = 0;
1716         /* need to reset violations buffer.
1717          * could do this more carefully, but for now, just
1718          * assume a bulk update wipes away all violations. */
1719         erase_violations(rrd, cdp_idx, rra_idx);
1720         break;
1721     }
1722 }
1723
1724 static rrd_value_t initialize_average_carry_over(
1725     rrd_value_t pdp_temp_val,
1726     unsigned long elapsed_pdp_st,
1727     unsigned long start_pdp_offset,
1728     unsigned long pdp_cnt)
1729 {
1730     /* initialize carry over value */
1731     if (isnan(pdp_temp_val)) {
1732         return DNAN;
1733     }
1734     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1735 }
1736
1737 /*
1738  * Update or initialize a CDP value based on the consolidation
1739  * function.
1740  *
1741  * Returns the new value.
1742  */
1743 static rrd_value_t calculate_cdp_val(
1744     rrd_value_t cdp_val,
1745     rrd_value_t pdp_temp_val,
1746     unsigned long elapsed_pdp_st,
1747     int current_cf,
1748 #ifdef DEBUG
1749     int i,
1750     int ii
1751 #else
1752     int UNUSED(i),
1753     int UNUSED(ii)
1754 #endif
1755 )
1756 {
1757     if (isnan(cdp_val)) {
1758         if (current_cf == CF_AVERAGE) {
1759             pdp_temp_val *= elapsed_pdp_st;
1760         }
1761 #ifdef DEBUG
1762         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1763                 i, ii, pdp_temp_val);
1764 #endif
1765         return pdp_temp_val;
1766     }
1767     if (current_cf == CF_AVERAGE)
1768         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1769     if (current_cf == CF_MINIMUM)
1770         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1771     if (current_cf == CF_MAXIMUM)
1772         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1773
1774     return pdp_temp_val;
1775 }
1776
1777 /*
1778  * For each RRA, update the seasonal values and then call update_aberrant_CF
1779  * for each data source.
1780  *
1781  * Return 0 on success, -1 on error.
1782  */
1783 static int update_aberrant_cdps(
1784     rrd_t *rrd,
1785     rrd_file_t *rrd_file,
1786     unsigned long rra_begin,
1787     unsigned long *rra_current,
1788     unsigned long elapsed_pdp_st,
1789     rrd_value_t *pdp_temp,
1790     rrd_value_t **seasonal_coef)
1791 {
1792     unsigned long rra_idx, ds_idx, j;
1793
1794     /* number of PDP steps since the last update that
1795      * are assigned to the first CDP to be generated
1796      * since the last update. */
1797     unsigned short scratch_idx;
1798     unsigned long rra_start;
1799     enum cf_en current_cf;
1800
1801     /* this loop is only entered if elapsed_pdp_st < 3 */
1802     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1803          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1804         rra_start = rra_begin;
1805         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1806             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1807                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1808                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1809                     if (scratch_idx == CDP_primary_val) {
1810                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1811                                         elapsed_pdp_st + 1, seasonal_coef);
1812                     } else {
1813                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1814                                         elapsed_pdp_st + 2, seasonal_coef);
1815                     }
1816                     *rra_current = rrd_tell(rrd_file);
1817                 }
1818                 if (rrd_test_error())
1819                     return -1;
1820                 /* loop over data soures within each RRA */
1821                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1822                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1823                                        rra_idx * (rrd->stat_head->ds_cnt) +
1824                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1825                                        *seasonal_coef);
1826                 }
1827             }
1828             rra_start += rrd->rra_def[rra_idx].row_cnt
1829                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1830         }
1831     }
1832     return 0;
1833 }
1834
1835 /* 
1836  * Move sequentially through the file, writing one RRA at a time.  Note this
1837  * architecture divorces the computation of CDP with flushing updated RRA
1838  * entries to disk.
1839  *
1840  * Return 0 on success, -1 on error.
1841  */
1842 static int write_to_rras(
1843     rrd_t *rrd,
1844     rrd_file_t *rrd_file,
1845     unsigned long *rra_step_cnt,
1846     unsigned long rra_begin,
1847     unsigned long *rra_current,
1848     time_t current_time,
1849     unsigned long *skip_update,
1850     info_t **pcdp_summary)
1851 {
1852     unsigned long rra_idx;
1853     unsigned long rra_start;
1854     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1855     time_t    rra_time = 0; /* time of update for a RRA */
1856
1857     /* Ready to write to disk */
1858     rra_start = rra_begin;
1859     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1860         /* skip unless there's something to write */
1861         if (rra_step_cnt[rra_idx]) {
1862             /* write the first row */
1863 #ifdef DEBUG
1864             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1865 #endif
1866             rrd->rra_ptr[rra_idx].cur_row++;
1867             if (rrd->rra_ptr[rra_idx].cur_row >=
1868                 rrd->rra_def[rra_idx].row_cnt)
1869                 rrd->rra_ptr[rra_idx].cur_row = 0;  /* wrap around */
1870             /* position on the first row */
1871             rra_pos_tmp = rra_start +
1872                 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1873                 sizeof(rrd_value_t);
1874             if (rra_pos_tmp != *rra_current) {
1875                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1876                     rrd_set_error("seek error in rrd");
1877                     return -1;
1878                 }
1879                 *rra_current = rra_pos_tmp;
1880             }
1881 #ifdef DEBUG
1882             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1883 #endif
1884             if (!skip_update[rra_idx]) {
1885                 if (*pcdp_summary != NULL) {
1886                     rra_time = (current_time - current_time
1887                                 % (rrd->rra_def[rra_idx].pdp_cnt *
1888                                    rrd->stat_head->pdp_step))
1889                         -
1890                         ((rra_step_cnt[rra_idx] -
1891                           1) * rrd->rra_def[rra_idx].pdp_cnt *
1892                          rrd->stat_head->pdp_step);
1893                 }
1894                 if (write_RRA_row
1895                     (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1896                      pcdp_summary, rra_time) == -1)
1897                     return -1;
1898             }
1899
1900             /* write other rows of the bulk update, if any */
1901             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1902                 if (++rrd->rra_ptr[rra_idx].cur_row ==
1903                     rrd->rra_def[rra_idx].row_cnt) {
1904 #ifdef DEBUG
1905                     fprintf(stderr,
1906                             "Wraparound for RRA %s, %lu updates left\n",
1907                             rrd->rra_def[rra_idx].cf_nam,
1908                             rra_step_cnt[rra_idx] - 1);
1909 #endif
1910                     /* wrap */
1911                     rrd->rra_ptr[rra_idx].cur_row = 0;
1912                     /* seek back to beginning of current rra */
1913                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1914                         rrd_set_error("seek error in rrd");
1915                         return -1;
1916                     }
1917 #ifdef DEBUG
1918                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1919                             rrd_file->pos);
1920 #endif
1921                     *rra_current = rra_start;
1922                 }
1923                 if (!skip_update[rra_idx]) {
1924                     if (*pcdp_summary != NULL) {
1925                         rra_time = (current_time - current_time
1926                                     % (rrd->rra_def[rra_idx].pdp_cnt *
1927                                        rrd->stat_head->pdp_step))
1928                             -
1929                             ((rra_step_cnt[rra_idx] -
1930                               2) * rrd->rra_def[rra_idx].pdp_cnt *
1931                              rrd->stat_head->pdp_step);
1932                     }
1933                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1934                                       CDP_secondary_val, pcdp_summary,
1935                                       rra_time) == -1)
1936                         return -1;
1937                 }
1938             }
1939         }
1940         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1941             sizeof(rrd_value_t);
1942     }                   /* RRA LOOP */
1943
1944     return 0;
1945 }
1946
1947 /*
1948  * Write out one row of values (one value per DS) to the archive.
1949  *
1950  * Returns 0 on success, -1 on error.
1951  */
1952 static int write_RRA_row(
1953     rrd_file_t *rrd_file,
1954     rrd_t *rrd,
1955     unsigned long rra_idx,
1956     unsigned long *rra_current,
1957     unsigned short CDP_scratch_idx,
1958     info_t **pcdp_summary,
1959     time_t rra_time)
1960 {
1961     unsigned long ds_idx, cdp_idx;
1962     infoval   iv;
1963
1964     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1965         /* compute the cdp index */
1966         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1967 #ifdef DEBUG
1968         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1969                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1970                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1971 #endif
1972         if (*pcdp_summary != NULL) {
1973             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1974             /* append info to the return hash */
1975             *pcdp_summary = info_push(*pcdp_summary,
1976                                       sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1977                                                     rra_time,
1978                                                     rrd->rra_def[rra_idx].
1979                                                     cf_nam,
1980                                                     rrd->rra_def[rra_idx].
1981                                                     pdp_cnt,
1982                                                     rrd->ds_def[ds_idx].
1983                                                     ds_nam), RD_I_VAL, iv);
1984         }
1985         if (rrd_write(rrd_file,
1986                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1987                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1988             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1989             return -1;
1990         }
1991         *rra_current += sizeof(rrd_value_t);
1992     }
1993     return 0;
1994 }
1995
1996 /*
1997  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1998  *
1999  * Returns 0 on success, -1 otherwise
2000  */
2001 static int smooth_all_rras(
2002     rrd_t *rrd,
2003     rrd_file_t *rrd_file,
2004     unsigned long rra_begin)
2005 {
2006     unsigned long rra_start = rra_begin;
2007     unsigned long rra_idx;
2008
2009     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2010         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2011             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2012 #ifdef DEBUG
2013             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2014 #endif
2015             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2016             if (rrd_test_error())
2017                 return -1;
2018         }
2019         rra_start += rrd->rra_def[rra_idx].row_cnt
2020             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2021     }
2022     return 0;
2023 }
2024
2025 #ifndef HAVE_MMAP
2026 /*
2027  * Flush changes to disk (unless we're using mmap)
2028  *
2029  * Returns 0 on success, -1 otherwise
2030  */
2031 static int write_changes_to_disk(
2032     rrd_t *rrd,
2033     rrd_file_t *rrd_file,
2034     int version)
2035 {
2036     /* we just need to write back the live header portion now */
2037     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2038                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2039                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2040                  SEEK_SET) != 0) {
2041         rrd_set_error("seek rrd for live header writeback");
2042         return -1;
2043     }
2044     if (version >= 3) {
2045         if (rrd_write(rrd_file, rrd->live_head,
2046                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2047             rrd_set_error("rrd_write live_head to rrd");
2048             return -1;
2049         }
2050     } else {
2051         if (rrd_write(rrd_file, &rrd->live_head->last_up,
2052                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2053             rrd_set_error("rrd_write live_head to rrd");
2054             return -1;
2055         }
2056     }
2057
2058
2059     if (rrd_write(rrd_file, rrd->pdp_prep,
2060                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2061         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2062         rrd_set_error("rrd_write pdp_prep to rrd");
2063         return -1;
2064     }
2065
2066     if (rrd_write(rrd_file, rrd->cdp_prep,
2067                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2068                   rrd->stat_head->ds_cnt)
2069         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2070                       rrd->stat_head->ds_cnt)) {
2071
2072         rrd_set_error("rrd_write cdp_prep to rrd");
2073         return -1;
2074     }
2075
2076     if (rrd_write(rrd_file, rrd->rra_ptr,
2077                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2078         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2079         rrd_set_error("rrd_write rra_ptr to rrd");
2080         return -1;
2081     }
2082     return 0;
2083 }
2084 #endif