fixed processing of custom fonts René GARCIA <rene@margar.fr>
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
9
10 #include "rrd_tool.h"
11
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
17
18 #include <locale.h>
19
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
22
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
25
26 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 /*
28  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
29  * replacement.
30  */
31 #include <sys/timeb.h>
32
33 #ifndef __MINGW32__
34 struct timeval {
35     time_t    tv_sec;   /* seconds */
36     long      tv_usec;  /* microseconds */
37 };
38 #endif
39
40 struct __timezone {
41     int       tz_minuteswest;   /* minutes W of Greenwich */
42     int       tz_dsttime;   /* type of dst correction */
43 };
44
45 static int gettimeofday(
46     struct timeval *t,
47     struct __timezone *tz)
48 {
49
50     struct _timeb current_time;
51
52     _ftime(&current_time);
53
54     t->tv_sec = current_time.time;
55     t->tv_usec = current_time.millitm * 1000;
56
57     return 0;
58 }
59
60 #endif
61
62 /* FUNCTION PROTOTYPES */
63
64 int       rrd_update_r(
65     const char *filename,
66     const char *tmplt,
67     int argc,
68     const char **argv);
69 int       _rrd_update(
70     const char *filename,
71     const char *tmplt,
72     int argc,
73     const char **argv,
74     rrd_info_t *);
75
76 static int allocate_data_structures(
77     rrd_t *rrd,
78     char ***updvals,
79     rrd_value_t **pdp_temp,
80     const char *tmplt,
81     long **tmpl_idx,
82     unsigned long *tmpl_cnt,
83     unsigned long **rra_step_cnt,
84     unsigned long **skip_update,
85     rrd_value_t **pdp_new);
86
87 static int parse_template(
88     rrd_t *rrd,
89     const char *tmplt,
90     unsigned long *tmpl_cnt,
91     long *tmpl_idx);
92
93 static int process_arg(
94     char *step_start,
95     rrd_t *rrd,
96     rrd_file_t *rrd_file,
97     unsigned long rra_begin,
98     time_t *current_time,
99     unsigned long *current_time_usec,
100     rrd_value_t *pdp_temp,
101     rrd_value_t *pdp_new,
102     unsigned long *rra_step_cnt,
103     char **updvals,
104     long *tmpl_idx,
105     unsigned long tmpl_cnt,
106     rrd_info_t ** pcdp_summary,
107     int version,
108     unsigned long *skip_update,
109     int *schedule_smooth);
110
111 static int parse_ds(
112     rrd_t *rrd,
113     char **updvals,
114     long *tmpl_idx,
115     char *input,
116     unsigned long tmpl_cnt,
117     time_t *current_time,
118     unsigned long *current_time_usec,
119     int version);
120
121 static int get_time_from_reading(
122     rrd_t *rrd,
123     char timesyntax,
124     char **updvals,
125     time_t *current_time,
126     unsigned long *current_time_usec,
127     int version);
128
129 static int update_pdp_prep(
130     rrd_t *rrd,
131     char **updvals,
132     rrd_value_t *pdp_new,
133     double interval);
134
135 static int calculate_elapsed_steps(
136     rrd_t *rrd,
137     unsigned long current_time,
138     unsigned long current_time_usec,
139     double interval,
140     double *pre_int,
141     double *post_int,
142     unsigned long *proc_pdp_cnt);
143
144 static void simple_update(
145     rrd_t *rrd,
146     double interval,
147     rrd_value_t *pdp_new);
148
149 static int process_all_pdp_st(
150     rrd_t *rrd,
151     double interval,
152     double pre_int,
153     double post_int,
154     unsigned long elapsed_pdp_st,
155     rrd_value_t *pdp_new,
156     rrd_value_t *pdp_temp);
157
158 static int process_pdp_st(
159     rrd_t *rrd,
160     unsigned long ds_idx,
161     double interval,
162     double pre_int,
163     double post_int,
164     long diff_pdp_st,
165     rrd_value_t *pdp_new,
166     rrd_value_t *pdp_temp);
167
168 static int update_all_cdp_prep(
169     rrd_t *rrd,
170     unsigned long *rra_step_cnt,
171     unsigned long rra_begin,
172     rrd_file_t *rrd_file,
173     unsigned long elapsed_pdp_st,
174     unsigned long proc_pdp_cnt,
175     rrd_value_t **last_seasonal_coef,
176     rrd_value_t **seasonal_coef,
177     rrd_value_t *pdp_temp,
178     unsigned long *skip_update,
179     int *schedule_smooth);
180
181 static int do_schedule_smooth(
182     rrd_t *rrd,
183     unsigned long rra_idx,
184     unsigned long elapsed_pdp_st);
185
186 static int update_cdp_prep(
187     rrd_t *rrd,
188     unsigned long elapsed_pdp_st,
189     unsigned long start_pdp_offset,
190     unsigned long *rra_step_cnt,
191     int rra_idx,
192     rrd_value_t *pdp_temp,
193     rrd_value_t *last_seasonal_coef,
194     rrd_value_t *seasonal_coef,
195     int current_cf);
196
197 static void update_cdp(
198     unival *scratch,
199     int current_cf,
200     rrd_value_t pdp_temp_val,
201     unsigned long rra_step_cnt,
202     unsigned long elapsed_pdp_st,
203     unsigned long start_pdp_offset,
204     unsigned long pdp_cnt,
205     rrd_value_t xff,
206     int i,
207     int ii);
208
209 static void initialize_cdp_val(
210     unival *scratch,
211     int current_cf,
212     rrd_value_t pdp_temp_val,
213     unsigned long elapsed_pdp_st,
214     unsigned long start_pdp_offset,
215     unsigned long pdp_cnt);
216
217 static void reset_cdp(
218     rrd_t *rrd,
219     unsigned long elapsed_pdp_st,
220     rrd_value_t *pdp_temp,
221     rrd_value_t *last_seasonal_coef,
222     rrd_value_t *seasonal_coef,
223     int rra_idx,
224     int ds_idx,
225     int cdp_idx,
226     enum cf_en current_cf);
227
228 static rrd_value_t initialize_average_carry_over(
229     rrd_value_t pdp_temp_val,
230     unsigned long elapsed_pdp_st,
231     unsigned long start_pdp_offset,
232     unsigned long pdp_cnt);
233
234 static rrd_value_t calculate_cdp_val(
235     rrd_value_t cdp_val,
236     rrd_value_t pdp_temp_val,
237     unsigned long elapsed_pdp_st,
238     int current_cf,
239     int i,
240     int ii);
241
242 static int update_aberrant_cdps(
243     rrd_t *rrd,
244     rrd_file_t *rrd_file,
245     unsigned long rra_begin,
246     unsigned long elapsed_pdp_st,
247     rrd_value_t *pdp_temp,
248     rrd_value_t **seasonal_coef);
249
250 static int write_to_rras(
251     rrd_t *rrd,
252     rrd_file_t *rrd_file,
253     unsigned long *rra_step_cnt,
254     unsigned long rra_begin,
255     time_t current_time,
256     unsigned long *skip_update,
257     rrd_info_t ** pcdp_summary);
258
259 static int write_RRA_row(
260     rrd_file_t *rrd_file,
261     rrd_t *rrd,
262     unsigned long rra_idx,
263     unsigned short CDP_scratch_idx,
264     rrd_info_t ** pcdp_summary,
265     time_t rra_time);
266
267 static int smooth_all_rras(
268     rrd_t *rrd,
269     rrd_file_t *rrd_file,
270     unsigned long rra_begin);
271
272 #ifndef HAVE_MMAP
273 static int write_changes_to_disk(
274     rrd_t *rrd,
275     rrd_file_t *rrd_file,
276     int version);
277 #endif
278
279 /*
280  * normalize time as returned by gettimeofday. usec part must
281  * be always >= 0
282  */
283 static inline void normalize_time(
284     struct timeval *t)
285 {
286     if (t->tv_usec < 0) {
287         t->tv_sec--;
288         t->tv_usec += 1e6L;
289     }
290 }
291
292 /*
293  * Sets current_time and current_time_usec based on the current time.
294  * current_time_usec is set to 0 if the version number is 1 or 2.
295  */
296 static inline void initialize_time(
297     time_t *current_time,
298     unsigned long *current_time_usec,
299     int version)
300 {
301     struct timeval tmp_time;    /* used for time conversion */
302
303     gettimeofday(&tmp_time, 0);
304     normalize_time(&tmp_time);
305     *current_time = tmp_time.tv_sec;
306     if (version >= 3) {
307         *current_time_usec = tmp_time.tv_usec;
308     } else {
309         *current_time_usec = 0;
310     }
311 }
312
313 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
314
315 rrd_info_t *rrd_update_v(
316     int argc,
317     char **argv)
318 {
319     char     *tmplt = NULL;
320     rrd_info_t *result = NULL;
321     rrd_infoval_t rc;
322     struct option long_options[] = {
323         {"template", required_argument, 0, 't'},
324         {0, 0, 0, 0}
325     };
326
327     rc.u_int = -1;
328     optind = 0;
329     opterr = 0;         /* initialize getopt */
330
331     while (1) {
332         int       option_index = 0;
333         int       opt;
334
335         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
336
337         if (opt == EOF)
338             break;
339
340         switch (opt) {
341         case 't':
342             tmplt = optarg;
343             break;
344
345         case '?':
346             rrd_set_error("unknown option '%s'", argv[optind - 1]);
347             goto end_tag;
348         }
349     }
350
351     /* need at least 2 arguments: filename, data. */
352     if (argc - optind < 2) {
353         rrd_set_error("Not enough arguments");
354         goto end_tag;
355     }
356     rc.u_int = 0;
357     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
358     rc.u_int = _rrd_update(argv[optind], tmplt,
359                            argc - optind - 1,
360                            (const char **) (argv + optind + 1), result);
361     result->value.u_int = rc.u_int;
362   end_tag:
363     return result;
364 }
365
366 int rrd_update(
367     int argc,
368     char **argv)
369 {
370     struct option long_options[] = {
371         {"template", required_argument, 0, 't'},
372         {0, 0, 0, 0}
373     };
374     int       option_index = 0;
375     int       opt;
376     char     *tmplt = NULL;
377     int       rc = -1;
378
379     optind = 0;
380     opterr = 0;         /* initialize getopt */
381
382     while (1) {
383         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
384
385         if (opt == EOF)
386             break;
387
388         switch (opt) {
389         case 't':
390             tmplt = strdup(optarg);
391             break;
392
393         case '?':
394             rrd_set_error("unknown option '%s'", argv[optind - 1]);
395             goto out;
396         }
397     }
398
399     /* need at least 2 arguments: filename, data. */
400     if (argc - optind < 2) {
401         rrd_set_error("Not enough arguments");
402         goto out;
403     }
404
405     rc = rrd_update_r(argv[optind], tmplt,
406                       argc - optind - 1, (const char **) (argv + optind + 1));
407   out:
408     free(tmplt);
409     return rc;
410 }
411
412 int rrd_update_r(
413     const char *filename,
414     const char *tmplt,
415     int argc,
416     const char **argv)
417 {
418     return _rrd_update(filename, tmplt, argc, argv, NULL);
419 }
420
421 int _rrd_update(
422     const char *filename,
423     const char *tmplt,
424     int argc,
425     const char **argv,
426     rrd_info_t * pcdp_summary)
427 {
428
429     int       arg_i = 2;
430
431     unsigned long rra_begin;    /* byte pointer to the rra
432                                  * area in the rrd file.  this
433                                  * pointer never changes value */
434     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
435                              * to the existing entry */
436     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
437                              * to the cdp values */
438
439     long     *tmpl_idx; /* index representing the settings
440                          * transported by the tmplt index */
441     unsigned long tmpl_cnt = 2; /* time and data */
442     rrd_t     rrd;
443     time_t    current_time = 0;
444     unsigned long current_time_usec = 0;    /* microseconds part of current time */
445     char    **updvals;
446     int       schedule_smooth = 0;
447
448     /* number of elapsed PDP steps since last update */
449     unsigned long *rra_step_cnt = NULL;
450
451     int       version;  /* rrd version */
452     rrd_file_t *rrd_file;
453     char     *arg_copy; /* for processing the argv */
454     unsigned long *skip_update; /* RRAs to advance but not write */
455
456     /* need at least 1 arguments: data. */
457     if (argc < 1) {
458         rrd_set_error("Not enough arguments");
459         goto err_out;
460     }
461
462     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
463         goto err_free;
464     }
465     /* We are now at the beginning of the rra's */
466     rra_begin = rrd_file->header_len;
467
468     version = atoi(rrd.stat_head->version);
469
470     initialize_time(&current_time, &current_time_usec, version);
471
472     /* get exclusive lock to whole file.
473      * lock gets removed when we close the file.
474      */
475     if (rrd_lock(rrd_file) != 0) {
476         rrd_set_error("could not lock RRD");
477         goto err_close;
478     }
479
480     if (allocate_data_structures(&rrd, &updvals,
481                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
482                                  &rra_step_cnt, &skip_update,
483                                  &pdp_new) == -1) {
484         goto err_close;
485     }
486
487     /* loop through the arguments. */
488     for (arg_i = 0; arg_i < argc; arg_i++) {
489         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
490             rrd_set_error("failed duplication argv entry");
491             break;
492         }
493         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
494                         &current_time, &current_time_usec, pdp_temp, pdp_new,
495                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
496                         &pcdp_summary, version, skip_update,
497                         &schedule_smooth) == -1) {
498             if (rrd_test_error()) { /* Should have error string always here */
499                 char     *save_error;
500
501                 /* Prepend file name to error message */
502                 if ((save_error = strdup(rrd_get_error())) != NULL) {
503                     rrd_set_error("%s: %s", filename, save_error);
504                     free(save_error);
505                 }
506             }
507             free(arg_copy);
508             break;
509         }
510         free(arg_copy);
511     }
512
513     free(rra_step_cnt);
514
515     /* if we got here and if there is an error and if the file has not been
516      * written to, then close things up and return. */
517     if (rrd_test_error()) {
518         goto err_free_structures;
519     }
520 #ifndef HAVE_MMAP
521     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
522         goto err_free_structures;
523     }
524 #endif
525
526     /* calling the smoothing code here guarantees at most one smoothing
527      * operation per rrd_update call. Unfortunately, it is possible with bulk
528      * updates, or a long-delayed update for smoothing to occur off-schedule.
529      * This really isn't critical except during the burn-in cycles. */
530     if (schedule_smooth) {
531         smooth_all_rras(&rrd, rrd_file, rra_begin);
532     }
533
534 /*    rrd_dontneed(rrd_file,&rrd); */
535     rrd_free(&rrd);
536     rrd_close(rrd_file);
537
538     free(pdp_new);
539     free(tmpl_idx);
540     free(pdp_temp);
541     free(skip_update);
542     free(updvals);
543     return 0;
544
545   err_free_structures:
546     free(pdp_new);
547     free(tmpl_idx);
548     free(pdp_temp);
549     free(skip_update);
550     free(updvals);
551   err_close:
552     rrd_close(rrd_file);
553   err_free:
554     rrd_free(&rrd);
555   err_out:
556     return -1;
557 }
558
559 /*
560  * get exclusive lock to whole file.
561  * lock gets removed when we close the file
562  *
563  * returns 0 on success
564  */
565 int rrd_lock(
566     rrd_file_t *file)
567 {
568     int       rcstat;
569
570     {
571 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
572         struct _stat st;
573
574         if (_fstat(file->fd, &st) == 0) {
575             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
576         } else {
577             rcstat = -1;
578         }
579 #else
580         struct flock lock;
581
582         lock.l_type = F_WRLCK;  /* exclusive write lock */
583         lock.l_len = 0; /* whole file */
584         lock.l_start = 0;   /* start of file */
585         lock.l_whence = SEEK_SET;   /* end of file */
586
587         rcstat = fcntl(file->fd, F_SETLK, &lock);
588 #endif
589     }
590
591     return (rcstat);
592 }
593
594 /*
595  * Allocate some important arrays used, and initialize the template.
596  *
597  * When it returns, either all of the structures are allocated
598  * or none of them are.
599  *
600  * Returns 0 on success, -1 on error.
601  */
602 static int allocate_data_structures(
603     rrd_t *rrd,
604     char ***updvals,
605     rrd_value_t **pdp_temp,
606     const char *tmplt,
607     long **tmpl_idx,
608     unsigned long *tmpl_cnt,
609     unsigned long **rra_step_cnt,
610     unsigned long **skip_update,
611     rrd_value_t **pdp_new)
612 {
613     unsigned  i, ii;
614     if ((*updvals = (char **) malloc(sizeof(char *)
615                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
616         rrd_set_error("allocating updvals pointer array.");
617         return -1;
618     }
619     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
620                                             * rrd->stat_head->ds_cnt)) ==
621         NULL) {
622         rrd_set_error("allocating pdp_temp.");
623         goto err_free_updvals;
624     }
625     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
626                                                  *
627                                                  rrd->stat_head->rra_cnt)) ==
628         NULL) {
629         rrd_set_error("allocating skip_update.");
630         goto err_free_pdp_temp;
631     }
632     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
633                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
634         rrd_set_error("allocating tmpl_idx.");
635         goto err_free_skip_update;
636     }
637     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
638                                                   *
639                                                   (rrd->stat_head->
640                                                    rra_cnt))) == NULL) {
641         rrd_set_error("allocating rra_step_cnt.");
642         goto err_free_tmpl_idx;
643     }
644
645     /* initialize tmplt redirector */
646     /* default config example (assume DS 1 is a CDEF DS)
647        tmpl_idx[0] -> 0; (time)
648        tmpl_idx[1] -> 1; (DS 0)
649        tmpl_idx[2] -> 3; (DS 2)
650        tmpl_idx[3] -> 4; (DS 3) */
651     (*tmpl_idx)[0] = 0; /* time */
652     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
653         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
654             (*tmpl_idx)[ii++] = i;
655     }
656     *tmpl_cnt = ii;
657
658     if (tmplt != NULL) {
659         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
660             goto err_free_rra_step_cnt;
661         }
662     }
663
664     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
665                                            * rrd->stat_head->ds_cnt)) == NULL) {
666         rrd_set_error("allocating pdp_new.");
667         goto err_free_rra_step_cnt;
668     }
669
670     return 0;
671
672   err_free_rra_step_cnt:
673     free(*rra_step_cnt);
674   err_free_tmpl_idx:
675     free(*tmpl_idx);
676   err_free_skip_update:
677     free(*skip_update);
678   err_free_pdp_temp:
679     free(*pdp_temp);
680   err_free_updvals:
681     free(*updvals);
682     return -1;
683 }
684
685 /*
686  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
687  *
688  * Returns 0 on success.
689  */
690 static int parse_template(
691     rrd_t *rrd,
692     const char *tmplt,
693     unsigned long *tmpl_cnt,
694     long *tmpl_idx)
695 {
696     char     *dsname, *tmplt_copy;
697     unsigned int tmpl_len, i;
698     int       ret = 0;
699
700     *tmpl_cnt = 1;      /* the first entry is the time */
701
702     /* we should work on a writeable copy here */
703     if ((tmplt_copy = strdup(tmplt)) == NULL) {
704         rrd_set_error("error copying tmplt '%s'", tmplt);
705         ret = -1;
706         goto out;
707     }
708
709     dsname = tmplt_copy;
710     tmpl_len = strlen(tmplt_copy);
711     for (i = 0; i <= tmpl_len; i++) {
712         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
713             tmplt_copy[i] = '\0';
714             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
715                 rrd_set_error("tmplt contains more DS definitions than RRD");
716                 ret = -1;
717                 goto out_free_tmpl_copy;
718             }
719             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
720                 rrd_set_error("unknown DS name '%s'", dsname);
721                 ret = -1;
722                 goto out_free_tmpl_copy;
723             }
724             /* go to the next entry on the tmplt_copy */
725             if (i < tmpl_len)
726                 dsname = &tmplt_copy[i + 1];
727         }
728     }
729   out_free_tmpl_copy:
730     free(tmplt_copy);
731   out:
732     return ret;
733 }
734
735 /*
736  * Parse an update string, updates the primary data points (PDPs)
737  * and consolidated data points (CDPs), and writes changes to the RRAs.
738  *
739  * Returns 0 on success, -1 on error.
740  */
741 static int process_arg(
742     char *step_start,
743     rrd_t *rrd,
744     rrd_file_t *rrd_file,
745     unsigned long rra_begin,
746     time_t *current_time,
747     unsigned long *current_time_usec,
748     rrd_value_t *pdp_temp,
749     rrd_value_t *pdp_new,
750     unsigned long *rra_step_cnt,
751     char **updvals,
752     long *tmpl_idx,
753     unsigned long tmpl_cnt,
754     rrd_info_t ** pcdp_summary,
755     int version,
756     unsigned long *skip_update,
757     int *schedule_smooth)
758 {
759     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
760
761     /* a vector of future Holt-Winters seasonal coefs */
762     unsigned long elapsed_pdp_st;
763
764     double    interval, pre_int, post_int;  /* interval between this and
765                                              * the last run */
766     unsigned long proc_pdp_cnt;
767
768     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
769                  current_time, current_time_usec, version) == -1) {
770         return -1;
771     }
772
773     interval = (double) (*current_time - rrd->live_head->last_up)
774         + (double) ((long) *current_time_usec -
775                     (long) rrd->live_head->last_up_usec) / 1e6f;
776
777     /* process the data sources and update the pdp_prep 
778      * area accordingly */
779     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
780         return -1;
781     }
782
783     elapsed_pdp_st = calculate_elapsed_steps(rrd,
784                                              *current_time,
785                                              *current_time_usec, interval,
786                                              &pre_int, &post_int,
787                                              &proc_pdp_cnt);
788
789     /* has a pdp_st moment occurred since the last run ? */
790     if (elapsed_pdp_st == 0) {
791         /* no we have not passed a pdp_st moment. therefore update is simple */
792         simple_update(rrd, interval, pdp_new);
793     } else {
794         /* an pdp_st has occurred. */
795         if (process_all_pdp_st(rrd, interval,
796                                pre_int, post_int,
797                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
798             return -1;
799         }
800         if (update_all_cdp_prep(rrd, rra_step_cnt,
801                                 rra_begin, rrd_file,
802                                 elapsed_pdp_st,
803                                 proc_pdp_cnt,
804                                 &last_seasonal_coef,
805                                 &seasonal_coef,
806                                 pdp_temp,
807                                 skip_update, schedule_smooth) == -1) {
808             goto err_free_coefficients;
809         }
810         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
811                                  elapsed_pdp_st, pdp_temp,
812                                  &seasonal_coef) == -1) {
813             goto err_free_coefficients;
814         }
815         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
816                           *current_time, skip_update,
817                           pcdp_summary) == -1) {
818             goto err_free_coefficients;
819         }
820     }                   /* endif a pdp_st has occurred */
821     rrd->live_head->last_up = *current_time;
822     rrd->live_head->last_up_usec = *current_time_usec;
823
824     if (version < 3) {
825         *rrd->legacy_last_up = rrd->live_head->last_up;
826     }
827     free(seasonal_coef);
828     free(last_seasonal_coef);
829     return 0;
830
831   err_free_coefficients:
832     free(seasonal_coef);
833     free(last_seasonal_coef);
834     return -1;
835 }
836
837 /*
838  * Parse a DS string (time + colon-separated values), storing the
839  * results in current_time, current_time_usec, and updvals.
840  *
841  * Returns 0 on success, -1 on error.
842  */
843 static int parse_ds(
844     rrd_t *rrd,
845     char **updvals,
846     long *tmpl_idx,
847     char *input,
848     unsigned long tmpl_cnt,
849     time_t *current_time,
850     unsigned long *current_time_usec,
851     int version)
852 {
853     char     *p;
854     unsigned long i;
855     char      timesyntax;
856
857     updvals[0] = input;
858     /* initialize all ds input to unknown except the first one
859        which has always got to be set */
860     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
861         updvals[i] = "U";
862
863     /* separate all ds elements; first must be examined separately
864        due to alternate time syntax */
865     if ((p = strchr(input, '@')) != NULL) {
866         timesyntax = '@';
867     } else if ((p = strchr(input, ':')) != NULL) {
868         timesyntax = ':';
869     } else {
870         rrd_set_error("expected timestamp not found in data source from %s",
871                       input);
872         return -1;
873     }
874     *p = '\0';
875     i = 1;
876     updvals[tmpl_idx[i++]] = p + 1;
877     while (*(++p)) {
878         if (*p == ':') {
879             *p = '\0';
880             if (i < tmpl_cnt) {
881                 updvals[tmpl_idx[i++]] = p + 1;
882             }
883         }
884     }
885
886     if (i != tmpl_cnt) {
887         rrd_set_error("expected %lu data source readings (got %lu) from %s",
888                       tmpl_cnt - 1, i, input);
889         return -1;
890     }
891
892     if (get_time_from_reading(rrd, timesyntax, updvals,
893                               current_time, current_time_usec,
894                               version) == -1) {
895         return -1;
896     }
897     return 0;
898 }
899
900 /*
901  * Parse the time in a DS string, store it in current_time and 
902  * current_time_usec and verify that it's later than the last
903  * update for this DS.
904  *
905  * Returns 0 on success, -1 on error.
906  */
907 static int get_time_from_reading(
908     rrd_t *rrd,
909     char timesyntax,
910     char **updvals,
911     time_t *current_time,
912     unsigned long *current_time_usec,
913     int version)
914 {
915     double    tmp;
916     char     *parsetime_error = NULL;
917     char     *old_locale;
918     rrd_time_value_t ds_tv;
919     struct timeval tmp_time;    /* used for time conversion */
920
921     /* get the time from the reading ... handle N */
922     if (timesyntax == '@') {    /* at-style */
923         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
924             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
925             return -1;
926         }
927         if (ds_tv.type == RELATIVE_TO_END_TIME ||
928             ds_tv.type == RELATIVE_TO_START_TIME) {
929             rrd_set_error("specifying time relative to the 'start' "
930                           "or 'end' makes no sense here: %s", updvals[0]);
931             return -1;
932         }
933         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
934         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
935     } else if (strcmp(updvals[0], "N") == 0) {
936         gettimeofday(&tmp_time, 0);
937         normalize_time(&tmp_time);
938         *current_time = tmp_time.tv_sec;
939         *current_time_usec = tmp_time.tv_usec;
940     } else {
941         old_locale = setlocale(LC_NUMERIC, "C");
942         tmp = strtod(updvals[0], 0);
943         setlocale(LC_NUMERIC, old_locale);
944         *current_time = floor(tmp);
945         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
946     }
947     /* dont do any correction for old version RRDs */
948     if (version < 3)
949         *current_time_usec = 0;
950
951     if (*current_time < rrd->live_head->last_up ||
952         (*current_time == rrd->live_head->last_up &&
953          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
954         rrd_set_error("illegal attempt to update using time %ld when "
955                       "last update time is %ld (minimum one second step)",
956                       *current_time, rrd->live_head->last_up);
957         return -1;
958     }
959     return 0;
960 }
961
962 /*
963  * Update pdp_new by interpreting the updvals according to the DS type
964  * (COUNTER, GAUGE, etc.).
965  *
966  * Returns 0 on success, -1 on error.
967  */
968 static int update_pdp_prep(
969     rrd_t *rrd,
970     char **updvals,
971     rrd_value_t *pdp_new,
972     double interval)
973 {
974     unsigned long ds_idx;
975     int       ii;
976     char     *endptr;   /* used in the conversion */
977     double    rate;
978     char     *old_locale;
979     enum dst_en dst_idx;
980
981     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
982         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
983
984         /* make sure we do not build diffs with old last_ds values */
985         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
986             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
987             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
988         }
989
990         /* NOTE: DST_CDEF should never enter this if block, because
991          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
992          * accidently specified a value for the DST_CDEF. To handle this case,
993          * an extra check is required. */
994
995         if ((updvals[ds_idx + 1][0] != 'U') &&
996             (dst_idx != DST_CDEF) &&
997             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
998             rate = DNAN;
999
1000             /* pdp_new contains rate * time ... eg the bytes transferred during
1001              * the interval. Doing it this way saves a lot of math operations
1002              */
1003             switch (dst_idx) {
1004             case DST_COUNTER:
1005             case DST_DERIVE:
1006                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1007                     if ((updvals[ds_idx + 1][ii] < '0'
1008                          || updvals[ds_idx + 1][ii] > '9')
1009                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1010                         rrd_set_error("not a simple integer: '%s'",
1011                                       updvals[ds_idx + 1]);
1012                         return -1;
1013                     }
1014                 }
1015                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1016                     pdp_new[ds_idx] =
1017                         rrd_diff(updvals[ds_idx + 1],
1018                                  rrd->pdp_prep[ds_idx].last_ds);
1019                     if (dst_idx == DST_COUNTER) {
1020                         /* simple overflow catcher. This will fail
1021                          * terribly for non 32 or 64 bit counters
1022                          * ... are there any others in SNMP land?
1023                          */
1024                         if (pdp_new[ds_idx] < (double) 0.0)
1025                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1026                         if (pdp_new[ds_idx] < (double) 0.0)
1027                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1028                     }
1029                     rate = pdp_new[ds_idx] / interval;
1030                 } else {
1031                     pdp_new[ds_idx] = DNAN;
1032                 }
1033                 break;
1034             case DST_ABSOLUTE:
1035                 old_locale = setlocale(LC_NUMERIC, "C");
1036                 errno = 0;
1037                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1038                 setlocale(LC_NUMERIC, old_locale);
1039                 if (errno > 0) {
1040                     rrd_set_error("converting '%s' to float: %s",
1041                                   updvals[ds_idx + 1], rrd_strerror(errno));
1042                     return -1;
1043                 };
1044                 if (endptr[0] != '\0') {
1045                     rrd_set_error
1046                         ("conversion of '%s' to float not complete: tail '%s'",
1047                          updvals[ds_idx + 1], endptr);
1048                     return -1;
1049                 }
1050                 rate = pdp_new[ds_idx] / interval;
1051                 break;
1052             case DST_GAUGE:
1053                 errno = 0;
1054                 old_locale = setlocale(LC_NUMERIC, "C");
1055                 pdp_new[ds_idx] =
1056                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1057                 setlocale(LC_NUMERIC, old_locale);
1058                 if (errno) {
1059                     rrd_set_error("converting '%s' to float: %s",
1060                                   updvals[ds_idx + 1], rrd_strerror(errno));
1061                     return -1;
1062                 };
1063                 if (endptr[0] != '\0') {
1064                     rrd_set_error
1065                         ("conversion of '%s' to float not complete: tail '%s'",
1066                          updvals[ds_idx + 1], endptr);
1067                     return -1;
1068                 }
1069                 rate = pdp_new[ds_idx] / interval;
1070                 break;
1071             default:
1072                 rrd_set_error("rrd contains unknown DS type : '%s'",
1073                               rrd->ds_def[ds_idx].dst);
1074                 return -1;
1075             }
1076             /* break out of this for loop if the error string is set */
1077             if (rrd_test_error()) {
1078                 return -1;
1079             }
1080             /* make sure pdp_temp is neither too large or too small
1081              * if any of these occur it becomes unknown ...
1082              * sorry folks ... */
1083             if (!isnan(rate) &&
1084                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1085                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1086                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1087                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1088                 pdp_new[ds_idx] = DNAN;
1089             }
1090         } else {
1091             /* no news is news all the same */
1092             pdp_new[ds_idx] = DNAN;
1093         }
1094
1095
1096         /* make a copy of the command line argument for the next run */
1097 #ifdef DEBUG
1098         fprintf(stderr, "prep ds[%lu]\t"
1099                 "last_arg '%s'\t"
1100                 "this_arg '%s'\t"
1101                 "pdp_new %10.2f\n",
1102                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1103                 pdp_new[ds_idx]);
1104 #endif
1105         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1106                 LAST_DS_LEN - 1);
1107         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1108     }
1109     return 0;
1110 }
1111
1112 /*
1113  * How many PDP steps have elapsed since the last update? Returns the answer,
1114  * and stores the time between the last update and the last PDP in pre_time,
1115  * and the time between the last PDP and the current time in post_int.
1116  */
1117 static int calculate_elapsed_steps(
1118     rrd_t *rrd,
1119     unsigned long current_time,
1120     unsigned long current_time_usec,
1121     double interval,
1122     double *pre_int,
1123     double *post_int,
1124     unsigned long *proc_pdp_cnt)
1125 {
1126     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1127     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1128                                  * time */
1129     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1130                                  * when it was last updated */
1131     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1132
1133     /* when was the current pdp started */
1134     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1135     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1136
1137     /* when did the last pdp_st occur */
1138     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1139     occu_pdp_st = current_time - occu_pdp_age;
1140
1141     if (occu_pdp_st > proc_pdp_st) {
1142         /* OK we passed the pdp_st moment */
1143         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1144                                                                      * occurred before the latest
1145                                                                      * pdp_st moment*/
1146         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1147         *post_int = occu_pdp_age;   /* how much after it */
1148         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1149     } else {
1150         *pre_int = interval;
1151         *post_int = 0;
1152     }
1153
1154     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1155
1156 #ifdef DEBUG
1157     printf("proc_pdp_age %lu\t"
1158            "proc_pdp_st %lu\t"
1159            "occu_pfp_age %lu\t"
1160            "occu_pdp_st %lu\t"
1161            "int %lf\t"
1162            "pre_int %lf\t"
1163            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1164            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1165 #endif
1166
1167     /* compute the number of elapsed pdp_st moments */
1168     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1169 }
1170
1171 /*
1172  * Increment the PDP values by the values in pdp_new, or else initialize them.
1173  */
1174 static void simple_update(
1175     rrd_t *rrd,
1176     double interval,
1177     rrd_value_t *pdp_new)
1178 {
1179     int       i;
1180
1181     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1182         if (isnan(pdp_new[i])) {
1183             /* this is not really accurate if we use subsecond data arrival time
1184                should have thought of it when going subsecond resolution ...
1185                sorry next format change we will have it! */
1186             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1187                 floor(interval);
1188         } else {
1189             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1190                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1191             } else {
1192                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1193             }
1194         }
1195 #ifdef DEBUG
1196         fprintf(stderr,
1197                 "NO PDP  ds[%i]\t"
1198                 "value %10.2f\t"
1199                 "unkn_sec %5lu\n",
1200                 i,
1201                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1202                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1203 #endif
1204     }
1205 }
1206
1207 /*
1208  * Call process_pdp_st for each DS.
1209  *
1210  * Returns 0 on success, -1 on error.
1211  */
1212 static int process_all_pdp_st(
1213     rrd_t *rrd,
1214     double interval,
1215     double pre_int,
1216     double post_int,
1217     unsigned long elapsed_pdp_st,
1218     rrd_value_t *pdp_new,
1219     rrd_value_t *pdp_temp)
1220 {
1221     unsigned long ds_idx;
1222
1223     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1224        rate*seconds which occurred up to the last run.
1225        pdp_new[] contains rate*seconds from the latest run.
1226        pdp_temp[] will contain the rate for cdp */
1227
1228     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1229         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1230                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1231                            pdp_new, pdp_temp) == -1) {
1232             return -1;
1233         }
1234 #ifdef DEBUG
1235         fprintf(stderr, "PDP UPD ds[%lu]\t"
1236                 "elapsed_pdp_st %lu\t"
1237                 "pdp_temp %10.2f\t"
1238                 "new_prep %10.2f\t"
1239                 "new_unkn_sec %5lu\n",
1240                 ds_idx,
1241                 elapsed_pdp_st,
1242                 pdp_temp[ds_idx],
1243                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1244                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1245 #endif
1246     }
1247     return 0;
1248 }
1249
1250 /*
1251  * Process an update that occurs after one of the PDP moments.
1252  * Increments the PDP value, sets NAN if time greater than the
1253  * heartbeats have elapsed, processes CDEFs.
1254  *
1255  * Returns 0 on success, -1 on error.
1256  */
1257 static int process_pdp_st(
1258     rrd_t *rrd,
1259     unsigned long ds_idx,
1260     double interval,
1261     double pre_int,
1262     double post_int,
1263     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1264     rrd_value_t *pdp_new,
1265     rrd_value_t *pdp_temp)
1266 {
1267     int       i;
1268
1269     /* update pdp_prep to the current pdp_st. */
1270     double    pre_unknown = 0.0;
1271     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1272     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1273
1274     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1275
1276     rpnstack_init(&rpnstack);
1277
1278
1279     if (isnan(pdp_new[ds_idx])) {
1280         /* a final bit of unknown to be added before calculation
1281            we use a temporary variable for this so that we
1282            don't have to turn integer lines before using the value */
1283         pre_unknown = pre_int;
1284     } else {
1285         if (isnan(scratch[PDP_val].u_val)) {
1286             scratch[PDP_val].u_val = 0;
1287         }
1288         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1289     }
1290
1291     /* if too much of the pdp_prep is unknown we dump it */
1292     /* if the interval is larger thatn mrhb we get NAN */
1293     if ((interval > mrhb) ||
1294         (rrd->stat_head->pdp_step / 2.0 <
1295          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1296         pdp_temp[ds_idx] = DNAN;
1297     } else {
1298         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1299             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1300              pre_unknown);
1301     }
1302
1303     /* process CDEF data sources; remember each CDEF DS can
1304      * only reference other DS with a lower index number */
1305     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1306         rpnp_t   *rpnp;
1307
1308         rpnp =
1309             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1310         /* substitute data values for OP_VARIABLE nodes */
1311         for (i = 0; rpnp[i].op != OP_END; i++) {
1312             if (rpnp[i].op == OP_VARIABLE) {
1313                 rpnp[i].op = OP_NUMBER;
1314                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1315             }
1316         }
1317         /* run the rpn calculator */
1318         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1319             free(rpnp);
1320             rpnstack_free(&rpnstack);
1321             return -1;
1322         }
1323     }
1324
1325     /* make pdp_prep ready for the next run */
1326     if (isnan(pdp_new[ds_idx])) {
1327         /* this is not realy accurate if we use subsecond data arival time
1328            should have thought of it when going subsecond resolution ...
1329            sorry next format change we will have it! */
1330         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1331         scratch[PDP_val].u_val = DNAN;
1332     } else {
1333         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1334         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1335     }
1336     rpnstack_free(&rpnstack);
1337     return 0;
1338 }
1339
1340 /*
1341  * Iterate over all the RRAs for a given DS and:
1342  * 1. Decide whether to schedule a smooth later
1343  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1344  * 3. Update the CDP
1345  *
1346  * Returns 0 on success, -1 on error
1347  */
1348 static int update_all_cdp_prep(
1349     rrd_t *rrd,
1350     unsigned long *rra_step_cnt,
1351     unsigned long rra_begin,
1352     rrd_file_t *rrd_file,
1353     unsigned long elapsed_pdp_st,
1354     unsigned long proc_pdp_cnt,
1355     rrd_value_t **last_seasonal_coef,
1356     rrd_value_t **seasonal_coef,
1357     rrd_value_t *pdp_temp,
1358     unsigned long *skip_update,
1359     int *schedule_smooth)
1360 {
1361     unsigned long rra_idx;
1362
1363     /* index into the CDP scratch array */
1364     enum cf_en current_cf;
1365     unsigned long rra_start;
1366
1367     /* number of rows to be updated in an RRA for a data value. */
1368     unsigned long start_pdp_offset;
1369
1370     rra_start = rra_begin;
1371     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1372         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1373         start_pdp_offset =
1374             rrd->rra_def[rra_idx].pdp_cnt -
1375             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1376         skip_update[rra_idx] = 0;
1377         if (start_pdp_offset <= elapsed_pdp_st) {
1378             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1379                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1380         } else {
1381             rra_step_cnt[rra_idx] = 0;
1382         }
1383
1384         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1385             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1386              * so that they will be correct for the next observed value; note that for
1387              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1388              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1389             if (rra_step_cnt[rra_idx] > 1) {
1390                 skip_update[rra_idx] = 1;
1391                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1392                                 elapsed_pdp_st, last_seasonal_coef);
1393                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1394                                 elapsed_pdp_st + 1, seasonal_coef);
1395             }
1396             /* periodically run a smoother for seasonal effects */
1397             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1398 #ifdef DEBUG
1399                 fprintf(stderr,
1400                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1401                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1402                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1403                         u_cnt);
1404 #endif
1405                 *schedule_smooth = 1;
1406             }
1407         }
1408         if (rrd_test_error())
1409             return -1;
1410
1411         if (update_cdp_prep
1412             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1413              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1414              current_cf) == -1) {
1415             return -1;
1416         }
1417         rra_start +=
1418             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1419             sizeof(rrd_value_t);
1420     }
1421     return 0;
1422 }
1423
1424 /* 
1425  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1426  */
1427 static int do_schedule_smooth(
1428     rrd_t *rrd,
1429     unsigned long rra_idx,
1430     unsigned long elapsed_pdp_st)
1431 {
1432     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1433     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1434     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1435     unsigned long seasonal_smooth_idx =
1436         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1437     unsigned long *init_seasonal =
1438         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1439
1440     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1441      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1442      * really an RRA level, not a data source within RRA level parameter, but
1443      * the rra_def is read only for rrd_update (not flushed to disk). */
1444     if (*init_seasonal > BURNIN_CYCLES) {
1445         /* someone has no doubt invented a trick to deal with this wrap around,
1446          * but at least this code is clear. */
1447         if (seasonal_smooth_idx > cur_row) {
1448             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1449              * between PDP and CDP */
1450             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1451         }
1452         /* can't rely on negative numbers because we are working with
1453          * unsigned values */
1454         return (cur_row + elapsed_pdp_st >= row_cnt
1455                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1456     }
1457     /* mark off one of the burn-in cycles */
1458     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1459 }
1460
1461 /*
1462  * For a given RRA, iterate over the data sources and call the appropriate
1463  * consolidation function.
1464  *
1465  * Returns 0 on success, -1 on error.
1466  */
1467 static int update_cdp_prep(
1468     rrd_t *rrd,
1469     unsigned long elapsed_pdp_st,
1470     unsigned long start_pdp_offset,
1471     unsigned long *rra_step_cnt,
1472     int rra_idx,
1473     rrd_value_t *pdp_temp,
1474     rrd_value_t *last_seasonal_coef,
1475     rrd_value_t *seasonal_coef,
1476     int current_cf)
1477 {
1478     unsigned long ds_idx, cdp_idx;
1479
1480     /* update CDP_PREP areas */
1481     /* loop over data soures within each RRA */
1482     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1483
1484         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1485
1486         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1487             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1488                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1489                        elapsed_pdp_st, start_pdp_offset,
1490                        rrd->rra_def[rra_idx].pdp_cnt,
1491                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1492                        rra_idx, ds_idx);
1493         } else {
1494             /* Nothing to consolidate if there's one PDP per CDP. However, if
1495              * we've missed some PDPs, let's update null counters etc. */
1496             if (elapsed_pdp_st > 2) {
1497                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1498                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1499                           current_cf);
1500             }
1501         }
1502
1503         if (rrd_test_error())
1504             return -1;
1505     }                   /* endif data sources loop */
1506     return 0;
1507 }
1508
1509 /*
1510  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1511  * primary value, secondary value, and # of unknowns.
1512  */
1513 static void update_cdp(
1514     unival *scratch,
1515     int current_cf,
1516     rrd_value_t pdp_temp_val,
1517     unsigned long rra_step_cnt,
1518     unsigned long elapsed_pdp_st,
1519     unsigned long start_pdp_offset,
1520     unsigned long pdp_cnt,
1521     rrd_value_t xff,
1522     int i,
1523     int ii)
1524 {
1525     /* shorthand variables */
1526     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1527     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1528     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1529     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1530
1531     if (rra_step_cnt) {
1532         /* If we are in this block, as least 1 CDP value will be written to
1533          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1534          * to be written, then the "fill in" value is the CDP_secondary_val
1535          * entry. */
1536         if (isnan(pdp_temp_val)) {
1537             *cdp_unkn_pdp_cnt += start_pdp_offset;
1538             *cdp_secondary_val = DNAN;
1539         } else {
1540             /* CDP_secondary value is the RRA "fill in" value for intermediary
1541              * CDP data entries. No matter the CF, the value is the same because
1542              * the average, max, min, and last of a list of identical values is
1543              * the same, namely, the value itself. */
1544             *cdp_secondary_val = pdp_temp_val;
1545         }
1546
1547         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1548             *cdp_primary_val = DNAN;
1549             if (current_cf == CF_AVERAGE) {
1550                 *cdp_val =
1551                     initialize_average_carry_over(pdp_temp_val,
1552                                                   elapsed_pdp_st,
1553                                                   start_pdp_offset, pdp_cnt);
1554             } else {
1555                 *cdp_val = pdp_temp_val;
1556             }
1557         } else {
1558             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1559                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1560         }               /* endif meets xff value requirement for a valid value */
1561         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1562          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1563         if (isnan(pdp_temp_val))
1564             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1565         else
1566             *cdp_unkn_pdp_cnt = 0;
1567     } else {            /* rra_step_cnt[i]  == 0 */
1568
1569 #ifdef DEBUG
1570         if (isnan(*cdp_val)) {
1571             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1572                     i, ii);
1573         } else {
1574             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1575                     i, ii, *cdp_val);
1576         }
1577 #endif
1578         if (isnan(pdp_temp_val)) {
1579             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1580         } else {
1581             *cdp_val =
1582                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1583                                   current_cf, i, ii);
1584         }
1585     }
1586 }
1587
1588 /*
1589  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1590  * on the type of consolidation function.
1591  */
1592 static void initialize_cdp_val(
1593     unival *scratch,
1594     int current_cf,
1595     rrd_value_t pdp_temp_val,
1596     unsigned long elapsed_pdp_st,
1597     unsigned long start_pdp_offset,
1598     unsigned long pdp_cnt)
1599 {
1600     rrd_value_t cum_val, cur_val;
1601
1602     switch (current_cf) {
1603     case CF_AVERAGE:
1604         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1605         cur_val = IFDNAN(pdp_temp_val, 0.0);
1606         scratch[CDP_primary_val].u_val =
1607             (cum_val + cur_val * start_pdp_offset) /
1608             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1609         scratch[CDP_val].u_val =
1610             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1611                                           start_pdp_offset, pdp_cnt);
1612         break;
1613     case CF_MAXIMUM:
1614         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1615         cur_val = IFDNAN(pdp_temp_val, -DINF);
1616 #if 0
1617 #ifdef DEBUG
1618         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1619             fprintf(stderr,
1620                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1621                     i, ii);
1622             exit(-1);
1623         }
1624 #endif
1625 #endif
1626         if (cur_val > cum_val)
1627             scratch[CDP_primary_val].u_val = cur_val;
1628         else
1629             scratch[CDP_primary_val].u_val = cum_val;
1630         /* initialize carry over value */
1631         scratch[CDP_val].u_val = pdp_temp_val;
1632         break;
1633     case CF_MINIMUM:
1634         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1635         cur_val = IFDNAN(pdp_temp_val, DINF);
1636 #if 0
1637 #ifdef DEBUG
1638         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1639             fprintf(stderr,
1640                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1641                     ii);
1642             exit(-1);
1643         }
1644 #endif
1645 #endif
1646         if (cur_val < cum_val)
1647             scratch[CDP_primary_val].u_val = cur_val;
1648         else
1649             scratch[CDP_primary_val].u_val = cum_val;
1650         /* initialize carry over value */
1651         scratch[CDP_val].u_val = pdp_temp_val;
1652         break;
1653     case CF_LAST:
1654     default:
1655         scratch[CDP_primary_val].u_val = pdp_temp_val;
1656         /* initialize carry over value */
1657         scratch[CDP_val].u_val = pdp_temp_val;
1658         break;
1659     }
1660 }
1661
1662 /*
1663  * Update the consolidation function for Holt-Winters functions as
1664  * well as other functions that don't actually consolidate multiple
1665  * PDPs.
1666  */
1667 static void reset_cdp(
1668     rrd_t *rrd,
1669     unsigned long elapsed_pdp_st,
1670     rrd_value_t *pdp_temp,
1671     rrd_value_t *last_seasonal_coef,
1672     rrd_value_t *seasonal_coef,
1673     int rra_idx,
1674     int ds_idx,
1675     int cdp_idx,
1676     enum cf_en current_cf)
1677 {
1678     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1679
1680     switch (current_cf) {
1681     case CF_AVERAGE:
1682     default:
1683         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1684         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1685         break;
1686     case CF_SEASONAL:
1687     case CF_DEVSEASONAL:
1688         /* need to update cached seasonal values, so they are consistent
1689          * with the bulk update */
1690         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1691          * CDP_last_deviation are the same. */
1692         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1693         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1694         break;
1695     case CF_HWPREDICT:
1696     case CF_MHWPREDICT:
1697         /* need to update the null_count and last_null_count.
1698          * even do this for non-DNAN pdp_temp because the
1699          * algorithm is not learning from batch updates. */
1700         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1701         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1702         /* fall through */
1703     case CF_DEVPREDICT:
1704         scratch[CDP_primary_val].u_val = DNAN;
1705         scratch[CDP_secondary_val].u_val = DNAN;
1706         break;
1707     case CF_FAILURES:
1708         /* do not count missed bulk values as failures */
1709         scratch[CDP_primary_val].u_val = 0;
1710         scratch[CDP_secondary_val].u_val = 0;
1711         /* need to reset violations buffer.
1712          * could do this more carefully, but for now, just
1713          * assume a bulk update wipes away all violations. */
1714         erase_violations(rrd, cdp_idx, rra_idx);
1715         break;
1716     }
1717 }
1718
1719 static rrd_value_t initialize_average_carry_over(
1720     rrd_value_t pdp_temp_val,
1721     unsigned long elapsed_pdp_st,
1722     unsigned long start_pdp_offset,
1723     unsigned long pdp_cnt)
1724 {
1725     /* initialize carry over value */
1726     if (isnan(pdp_temp_val)) {
1727         return DNAN;
1728     }
1729     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1730 }
1731
1732 /*
1733  * Update or initialize a CDP value based on the consolidation
1734  * function.
1735  *
1736  * Returns the new value.
1737  */
1738 static rrd_value_t calculate_cdp_val(
1739     rrd_value_t cdp_val,
1740     rrd_value_t pdp_temp_val,
1741     unsigned long elapsed_pdp_st,
1742     int current_cf,
1743 #ifdef DEBUG
1744     int i,
1745     int ii
1746 #else
1747     int UNUSED(i),
1748     int UNUSED(ii)
1749 #endif
1750     )
1751 {
1752     if (isnan(cdp_val)) {
1753         if (current_cf == CF_AVERAGE) {
1754             pdp_temp_val *= elapsed_pdp_st;
1755         }
1756 #ifdef DEBUG
1757         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1758                 i, ii, pdp_temp_val);
1759 #endif
1760         return pdp_temp_val;
1761     }
1762     if (current_cf == CF_AVERAGE)
1763         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1764     if (current_cf == CF_MINIMUM)
1765         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1766     if (current_cf == CF_MAXIMUM)
1767         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1768
1769     return pdp_temp_val;
1770 }
1771
1772 /*
1773  * For each RRA, update the seasonal values and then call update_aberrant_CF
1774  * for each data source.
1775  *
1776  * Return 0 on success, -1 on error.
1777  */
1778 static int update_aberrant_cdps(
1779     rrd_t *rrd,
1780     rrd_file_t *rrd_file,
1781     unsigned long rra_begin,
1782     unsigned long elapsed_pdp_st,
1783     rrd_value_t *pdp_temp,
1784     rrd_value_t **seasonal_coef)
1785 {
1786     unsigned long rra_idx, ds_idx, j;
1787
1788     /* number of PDP steps since the last update that
1789      * are assigned to the first CDP to be generated
1790      * since the last update. */
1791     unsigned short scratch_idx;
1792     unsigned long rra_start;
1793     enum cf_en current_cf;
1794
1795     /* this loop is only entered if elapsed_pdp_st < 3 */
1796     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1797          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1798         rra_start = rra_begin;
1799         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1800             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1801                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1802                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1803                     if (scratch_idx == CDP_primary_val) {
1804                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1805                                         elapsed_pdp_st + 1, seasonal_coef);
1806                     } else {
1807                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1808                                         elapsed_pdp_st + 2, seasonal_coef);
1809                     }
1810                 }
1811                 if (rrd_test_error())
1812                     return -1;
1813                 /* loop over data soures within each RRA */
1814                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1815                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1816                                        rra_idx * (rrd->stat_head->ds_cnt) +
1817                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1818                                        *seasonal_coef);
1819                 }
1820             }
1821             rra_start += rrd->rra_def[rra_idx].row_cnt
1822                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1823         }
1824     }
1825     return 0;
1826 }
1827
1828 /* 
1829  * Move sequentially through the file, writing one RRA at a time.  Note this
1830  * architecture divorces the computation of CDP with flushing updated RRA
1831  * entries to disk.
1832  *
1833  * Return 0 on success, -1 on error.
1834  */
1835 static int write_to_rras(
1836     rrd_t *rrd,
1837     rrd_file_t *rrd_file,
1838     unsigned long *rra_step_cnt,
1839     unsigned long rra_begin,
1840     time_t current_time,
1841     unsigned long *skip_update,
1842     rrd_info_t ** pcdp_summary)
1843 {
1844     unsigned long rra_idx;
1845     unsigned long rra_start;
1846     time_t    rra_time = 0; /* time of update for a RRA */
1847
1848     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1849     
1850     /* Ready to write to disk */
1851     rra_start = rra_begin;
1852
1853     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1854         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1855         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1856
1857         /* for cdp_prep */
1858         unsigned short scratch_idx;
1859         unsigned long step_subtract;
1860
1861         for (scratch_idx = CDP_primary_val,
1862                  step_subtract = 1;
1863              rra_step_cnt[rra_idx] > 0;
1864              rra_step_cnt[rra_idx]--,
1865                  scratch_idx = CDP_secondary_val,
1866                  step_subtract = 2) {
1867
1868             off_t rra_pos_new;
1869 #ifdef DEBUG
1870             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1871 #endif
1872             /* increment, with wrap-around */
1873             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1874               rra_ptr->cur_row = 0;
1875
1876             /* we know what our position should be */
1877             rra_pos_new = rra_start
1878               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1879
1880             /* re-seek if the position is wrong or we wrapped around */
1881             if (rra_pos_new != rrd_file->pos) {
1882                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1883                     rrd_set_error("seek error in rrd");
1884                     return -1;
1885                 }
1886             }
1887 #ifdef DEBUG
1888             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1889 #endif
1890
1891             if (skip_update[rra_idx])
1892                 continue;
1893
1894             if (*pcdp_summary != NULL) {
1895                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1896
1897                 rra_time = (current_time - current_time % step_time)
1898                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1899             }
1900
1901             if (write_RRA_row
1902                 (rrd_file, rrd, rra_idx, scratch_idx,
1903                  pcdp_summary, rra_time) == -1)
1904                 return -1;
1905         }
1906
1907         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1908     } /* RRA LOOP */
1909
1910     return 0;
1911 }
1912
1913 /*
1914  * Write out one row of values (one value per DS) to the archive.
1915  *
1916  * Returns 0 on success, -1 on error.
1917  */
1918 static int write_RRA_row(
1919     rrd_file_t *rrd_file,
1920     rrd_t *rrd,
1921     unsigned long rra_idx,
1922     unsigned short CDP_scratch_idx,
1923     rrd_info_t ** pcdp_summary,
1924     time_t rra_time)
1925 {
1926     unsigned long ds_idx, cdp_idx;
1927     rrd_infoval_t iv;
1928
1929     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1930         /* compute the cdp index */
1931         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1932 #ifdef DEBUG
1933         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1934                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1935                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1936 #endif
1937         if (*pcdp_summary != NULL) {
1938             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1939             /* append info to the return hash */
1940             *pcdp_summary = rrd_info_push(*pcdp_summary,
1941                                           sprintf_alloc
1942                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1943                                            rrd->rra_def[rra_idx].cf_nam,
1944                                            rrd->rra_def[rra_idx].pdp_cnt,
1945                                            rrd->ds_def[ds_idx].ds_nam),
1946                                           RD_I_VAL, iv);
1947         }
1948         if (rrd_write(rrd_file,
1949                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1950                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1951             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1952             return -1;
1953         }
1954     }
1955     return 0;
1956 }
1957
1958 /*
1959  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1960  *
1961  * Returns 0 on success, -1 otherwise
1962  */
1963 static int smooth_all_rras(
1964     rrd_t *rrd,
1965     rrd_file_t *rrd_file,
1966     unsigned long rra_begin)
1967 {
1968     unsigned long rra_start = rra_begin;
1969     unsigned long rra_idx;
1970
1971     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
1972         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
1973             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
1974 #ifdef DEBUG
1975             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
1976 #endif
1977             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
1978             if (rrd_test_error())
1979                 return -1;
1980         }
1981         rra_start += rrd->rra_def[rra_idx].row_cnt
1982             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1983     }
1984     return 0;
1985 }
1986
1987 #ifndef HAVE_MMAP
1988 /*
1989  * Flush changes to disk (unless we're using mmap)
1990  *
1991  * Returns 0 on success, -1 otherwise
1992  */
1993 static int write_changes_to_disk(
1994     rrd_t *rrd,
1995     rrd_file_t *rrd_file,
1996     int version)
1997 {
1998     /* we just need to write back the live header portion now */
1999     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2000                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2001                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2002                  SEEK_SET) != 0) {
2003         rrd_set_error("seek rrd for live header writeback");
2004         return -1;
2005     }
2006     if (version >= 3) {
2007         if (rrd_write(rrd_file, rrd->live_head,
2008                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2009             rrd_set_error("rrd_write live_head to rrd");
2010             return -1;
2011         }
2012     } else {
2013         if (rrd_write(rrd_file, rrd->legacy_last_up,
2014                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2015             rrd_set_error("rrd_write live_head to rrd");
2016             return -1;
2017         }
2018     }
2019
2020
2021     if (rrd_write(rrd_file, rrd->pdp_prep,
2022                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2023         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2024         rrd_set_error("rrd_write pdp_prep to rrd");
2025         return -1;
2026     }
2027
2028     if (rrd_write(rrd_file, rrd->cdp_prep,
2029                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2030                   rrd->stat_head->ds_cnt)
2031         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2032                       rrd->stat_head->ds_cnt)) {
2033
2034         rrd_set_error("rrd_write cdp_prep to rrd");
2035         return -1;
2036     }
2037
2038     if (rrd_write(rrd_file, rrd->rra_ptr,
2039                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2040         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2041         rrd_set_error("rrd_write rra_ptr to rrd");
2042         return -1;
2043     }
2044     return 0;
2045 }
2046 #endif