This patch introduces a feature whereby rrdcached will disallow updates
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
4  *                Copyright by Florian Forster, 2008
5  *****************************************************************************
6  * rrd_update.c  RRD Update Function
7  *****************************************************************************
8  * $Id$
9  *****************************************************************************/
10
11 #include "rrd_tool.h"
12
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
18
19 #include <locale.h>
20
21 #include "rrd_hw.h"
22 #include "rrd_rpncalc.h"
23
24 #include "rrd_is_thread_safe.h"
25 #include "unused.h"
26
27 #include "rrd_client.h"
28
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 /*
31  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
32  * replacement.
33  */
34 #include <sys/timeb.h>
35
36 #ifndef __MINGW32__
37 struct timeval {
38     time_t    tv_sec;   /* seconds */
39     long      tv_usec;  /* microseconds */
40 };
41 #endif
42
43 struct __timezone {
44     int       tz_minuteswest;   /* minutes W of Greenwich */
45     int       tz_dsttime;   /* type of dst correction */
46 };
47
48 static int gettimeofday(
49     struct timeval *t,
50     struct __timezone *tz)
51 {
52
53     struct _timeb current_time;
54
55     _ftime(&current_time);
56
57     t->tv_sec = current_time.time;
58     t->tv_usec = current_time.millitm * 1000;
59
60     return 0;
61 }
62
63 #endif
64
65 /* FUNCTION PROTOTYPES */
66
67 int       rrd_update_r(
68     const char *filename,
69     const char *tmplt,
70     int argc,
71     const char **argv);
72 int       _rrd_update(
73     const char *filename,
74     const char *tmplt,
75     int argc,
76     const char **argv,
77     rrd_info_t *);
78
79 static int allocate_data_structures(
80     rrd_t *rrd,
81     char ***updvals,
82     rrd_value_t **pdp_temp,
83     const char *tmplt,
84     long **tmpl_idx,
85     unsigned long *tmpl_cnt,
86     unsigned long **rra_step_cnt,
87     unsigned long **skip_update,
88     rrd_value_t **pdp_new);
89
90 static int parse_template(
91     rrd_t *rrd,
92     const char *tmplt,
93     unsigned long *tmpl_cnt,
94     long *tmpl_idx);
95
96 static int process_arg(
97     char *step_start,
98     rrd_t *rrd,
99     rrd_file_t *rrd_file,
100     unsigned long rra_begin,
101     time_t *current_time,
102     unsigned long *current_time_usec,
103     rrd_value_t *pdp_temp,
104     rrd_value_t *pdp_new,
105     unsigned long *rra_step_cnt,
106     char **updvals,
107     long *tmpl_idx,
108     unsigned long tmpl_cnt,
109     rrd_info_t ** pcdp_summary,
110     int version,
111     unsigned long *skip_update,
112     int *schedule_smooth);
113
114 static int parse_ds(
115     rrd_t *rrd,
116     char **updvals,
117     long *tmpl_idx,
118     char *input,
119     unsigned long tmpl_cnt,
120     time_t *current_time,
121     unsigned long *current_time_usec,
122     int version);
123
124 static int get_time_from_reading(
125     rrd_t *rrd,
126     char timesyntax,
127     char **updvals,
128     time_t *current_time,
129     unsigned long *current_time_usec,
130     int version);
131
132 static int update_pdp_prep(
133     rrd_t *rrd,
134     char **updvals,
135     rrd_value_t *pdp_new,
136     double interval);
137
138 static int calculate_elapsed_steps(
139     rrd_t *rrd,
140     unsigned long current_time,
141     unsigned long current_time_usec,
142     double interval,
143     double *pre_int,
144     double *post_int,
145     unsigned long *proc_pdp_cnt);
146
147 static void simple_update(
148     rrd_t *rrd,
149     double interval,
150     rrd_value_t *pdp_new);
151
152 static int process_all_pdp_st(
153     rrd_t *rrd,
154     double interval,
155     double pre_int,
156     double post_int,
157     unsigned long elapsed_pdp_st,
158     rrd_value_t *pdp_new,
159     rrd_value_t *pdp_temp);
160
161 static int process_pdp_st(
162     rrd_t *rrd,
163     unsigned long ds_idx,
164     double interval,
165     double pre_int,
166     double post_int,
167     long diff_pdp_st,
168     rrd_value_t *pdp_new,
169     rrd_value_t *pdp_temp);
170
171 static int update_all_cdp_prep(
172     rrd_t *rrd,
173     unsigned long *rra_step_cnt,
174     unsigned long rra_begin,
175     rrd_file_t *rrd_file,
176     unsigned long elapsed_pdp_st,
177     unsigned long proc_pdp_cnt,
178     rrd_value_t **last_seasonal_coef,
179     rrd_value_t **seasonal_coef,
180     rrd_value_t *pdp_temp,
181     unsigned long *skip_update,
182     int *schedule_smooth);
183
184 static int do_schedule_smooth(
185     rrd_t *rrd,
186     unsigned long rra_idx,
187     unsigned long elapsed_pdp_st);
188
189 static int update_cdp_prep(
190     rrd_t *rrd,
191     unsigned long elapsed_pdp_st,
192     unsigned long start_pdp_offset,
193     unsigned long *rra_step_cnt,
194     int rra_idx,
195     rrd_value_t *pdp_temp,
196     rrd_value_t *last_seasonal_coef,
197     rrd_value_t *seasonal_coef,
198     int current_cf);
199
200 static void update_cdp(
201     unival *scratch,
202     int current_cf,
203     rrd_value_t pdp_temp_val,
204     unsigned long rra_step_cnt,
205     unsigned long elapsed_pdp_st,
206     unsigned long start_pdp_offset,
207     unsigned long pdp_cnt,
208     rrd_value_t xff,
209     int i,
210     int ii);
211
212 static void initialize_cdp_val(
213     unival *scratch,
214     int current_cf,
215     rrd_value_t pdp_temp_val,
216     unsigned long elapsed_pdp_st,
217     unsigned long start_pdp_offset,
218     unsigned long pdp_cnt);
219
220 static void reset_cdp(
221     rrd_t *rrd,
222     unsigned long elapsed_pdp_st,
223     rrd_value_t *pdp_temp,
224     rrd_value_t *last_seasonal_coef,
225     rrd_value_t *seasonal_coef,
226     int rra_idx,
227     int ds_idx,
228     int cdp_idx,
229     enum cf_en current_cf);
230
231 static rrd_value_t initialize_average_carry_over(
232     rrd_value_t pdp_temp_val,
233     unsigned long elapsed_pdp_st,
234     unsigned long start_pdp_offset,
235     unsigned long pdp_cnt);
236
237 static rrd_value_t calculate_cdp_val(
238     rrd_value_t cdp_val,
239     rrd_value_t pdp_temp_val,
240     unsigned long elapsed_pdp_st,
241     int current_cf,
242     int i,
243     int ii);
244
245 static int update_aberrant_cdps(
246     rrd_t *rrd,
247     rrd_file_t *rrd_file,
248     unsigned long rra_begin,
249     unsigned long elapsed_pdp_st,
250     rrd_value_t *pdp_temp,
251     rrd_value_t **seasonal_coef);
252
253 static int write_to_rras(
254     rrd_t *rrd,
255     rrd_file_t *rrd_file,
256     unsigned long *rra_step_cnt,
257     unsigned long rra_begin,
258     time_t current_time,
259     unsigned long *skip_update,
260     rrd_info_t ** pcdp_summary);
261
262 static int write_RRA_row(
263     rrd_file_t *rrd_file,
264     rrd_t *rrd,
265     unsigned long rra_idx,
266     unsigned short CDP_scratch_idx,
267     rrd_info_t ** pcdp_summary,
268     time_t rra_time);
269
270 static int smooth_all_rras(
271     rrd_t *rrd,
272     rrd_file_t *rrd_file,
273     unsigned long rra_begin);
274
275 #ifndef HAVE_MMAP
276 static int write_changes_to_disk(
277     rrd_t *rrd,
278     rrd_file_t *rrd_file,
279     int version);
280 #endif
281
282 /*
283  * normalize time as returned by gettimeofday. usec part must
284  * be always >= 0
285  */
286 static inline void normalize_time(
287     struct timeval *t)
288 {
289     if (t->tv_usec < 0) {
290         t->tv_sec--;
291         t->tv_usec += 1e6L;
292     }
293 }
294
295 /*
296  * Sets current_time and current_time_usec based on the current time.
297  * current_time_usec is set to 0 if the version number is 1 or 2.
298  */
299 static inline void initialize_time(
300     time_t *current_time,
301     unsigned long *current_time_usec,
302     int version)
303 {
304     struct timeval tmp_time;    /* used for time conversion */
305
306     gettimeofday(&tmp_time, 0);
307     normalize_time(&tmp_time);
308     *current_time = tmp_time.tv_sec;
309     if (version >= 3) {
310         *current_time_usec = tmp_time.tv_usec;
311     } else {
312         *current_time_usec = 0;
313     }
314 }
315
316 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317
318 rrd_info_t *rrd_update_v(
319     int argc,
320     char **argv)
321 {
322     char     *tmplt = NULL;
323     rrd_info_t *result = NULL;
324     rrd_infoval_t rc;
325     char *opt_daemon = NULL;
326     struct option long_options[] = {
327         {"template", required_argument, 0, 't'},
328         {0, 0, 0, 0}
329     };
330
331     rc.u_int = -1;
332     optind = 0;
333     opterr = 0;         /* initialize getopt */
334
335     while (1) {
336         int       option_index = 0;
337         int       opt;
338
339         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
340
341         if (opt == EOF)
342             break;
343
344         switch (opt) {
345         case 't':
346             tmplt = optarg;
347             break;
348
349         case '?':
350             rrd_set_error("unknown option '%s'", argv[optind - 1]);
351             goto end_tag;
352         }
353     }
354
355     opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
356     if (opt_daemon != NULL) {
357         rrd_set_error ("The \"%s\" environment variable is defined, "
358                 "but \"%s\" cannot work with rrdcached. Either unset "
359                 "the environment variable or use \"update\" instead.",
360                 ENV_RRDCACHED_ADDRESS, argv[0]);
361         goto end_tag;
362     }
363
364     /* need at least 2 arguments: filename, data. */
365     if (argc - optind < 2) {
366         rrd_set_error("Not enough arguments");
367         goto end_tag;
368     }
369     rc.u_int = 0;
370     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
371     rc.u_int = _rrd_update(argv[optind], tmplt,
372                            argc - optind - 1,
373                            (const char **) (argv + optind + 1), result);
374     result->value.u_int = rc.u_int;
375   end_tag:
376     return result;
377 }
378
379 int rrd_update(
380     int argc,
381     char **argv)
382 {
383     struct option long_options[] = {
384         {"template", required_argument, 0, 't'},
385         {"daemon",   required_argument, 0, 'd'},
386         {0, 0, 0, 0}
387     };
388     int       option_index = 0;
389     int       opt;
390     char     *tmplt = NULL;
391     int       rc = -1;
392     char     *opt_daemon = NULL;
393
394     optind = 0;
395     opterr = 0;         /* initialize getopt */
396
397     while (1) {
398         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
399
400         if (opt == EOF)
401             break;
402
403         switch (opt) {
404         case 't':
405             tmplt = strdup(optarg);
406             break;
407
408         case 'd':
409             if (opt_daemon != NULL)
410                 free (opt_daemon);
411             opt_daemon = strdup (optarg);
412             if (opt_daemon == NULL)
413             {
414                 rrd_set_error("strdup failed.");
415                 goto out;
416             }
417             break;
418
419         case '?':
420             rrd_set_error("unknown option '%s'", argv[optind - 1]);
421             goto out;
422         }
423     }
424
425     /* need at least 2 arguments: filename, data. */
426     if (argc - optind < 2) {
427         rrd_set_error("Not enough arguments");
428         goto out;
429     }
430
431     {   /* try to connect to rrdcached */
432         int status = rrdc_connect(opt_daemon);
433         if (status != 0) return status;
434     }
435
436     if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
437     {
438         rrd_set_error("The caching daemon cannot be used together with "
439                 "templates yet.");
440         goto out;
441     }
442
443     if (! rrdc_is_connected(opt_daemon))
444     {
445       rc = rrd_update_r(argv[optind], tmplt,
446                         argc - optind - 1, (const char **) (argv + optind + 1));
447     }
448     else /* we are connected */
449     {
450         rc = rrdc_update (argv[optind], /* file */
451                           argc - optind - 1, /* values_num */
452                           (void *) (argv + optind + 1)); /* values */
453         if (rc > 0)
454             rrd_set_error("Failed sending the values to rrdcached: %s",
455                           rrd_strerror (rc));
456     }
457
458   out:
459     if (tmplt != NULL)
460     {
461         free(tmplt);
462         tmplt = NULL;
463     }
464     if (opt_daemon != NULL)
465     {
466         free (opt_daemon);
467         opt_daemon = NULL;
468     }
469     return rc;
470 }
471
472 int rrd_update_r(
473     const char *filename,
474     const char *tmplt,
475     int argc,
476     const char **argv)
477 {
478     return _rrd_update(filename, tmplt, argc, argv, NULL);
479 }
480
481 int _rrd_update(
482     const char *filename,
483     const char *tmplt,
484     int argc,
485     const char **argv,
486     rrd_info_t * pcdp_summary)
487 {
488
489     int       arg_i = 2;
490
491     unsigned long rra_begin;    /* byte pointer to the rra
492                                  * area in the rrd file.  this
493                                  * pointer never changes value */
494     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
495                              * to the existing entry */
496     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
497                              * to the cdp values */
498
499     long     *tmpl_idx; /* index representing the settings
500                          * transported by the tmplt index */
501     unsigned long tmpl_cnt = 2; /* time and data */
502     rrd_t     rrd;
503     time_t    current_time = 0;
504     unsigned long current_time_usec = 0;    /* microseconds part of current time */
505     char    **updvals;
506     int       schedule_smooth = 0;
507
508     /* number of elapsed PDP steps since last update */
509     unsigned long *rra_step_cnt = NULL;
510
511     int       version;  /* rrd version */
512     rrd_file_t *rrd_file;
513     char     *arg_copy; /* for processing the argv */
514     unsigned long *skip_update; /* RRAs to advance but not write */
515
516     /* need at least 1 arguments: data. */
517     if (argc < 1) {
518         rrd_set_error("Not enough arguments");
519         goto err_out;
520     }
521
522     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
523         goto err_free;
524     }
525     /* We are now at the beginning of the rra's */
526     rra_begin = rrd_file->header_len;
527
528     version = atoi(rrd.stat_head->version);
529
530     initialize_time(&current_time, &current_time_usec, version);
531
532     /* get exclusive lock to whole file.
533      * lock gets removed when we close the file.
534      */
535     if (rrd_lock(rrd_file) != 0) {
536         rrd_set_error("could not lock RRD");
537         goto err_close;
538     }
539
540     if (allocate_data_structures(&rrd, &updvals,
541                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
542                                  &rra_step_cnt, &skip_update,
543                                  &pdp_new) == -1) {
544         goto err_close;
545     }
546
547     /* loop through the arguments. */
548     for (arg_i = 0; arg_i < argc; arg_i++) {
549         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
550             rrd_set_error("failed duplication argv entry");
551             break;
552         }
553         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
554                         &current_time, &current_time_usec, pdp_temp, pdp_new,
555                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
556                         &pcdp_summary, version, skip_update,
557                         &schedule_smooth) == -1) {
558             if (rrd_test_error()) { /* Should have error string always here */
559                 char     *save_error;
560
561                 /* Prepend file name to error message */
562                 if ((save_error = strdup(rrd_get_error())) != NULL) {
563                     rrd_set_error("%s: %s", filename, save_error);
564                     free(save_error);
565                 }
566             }
567             free(arg_copy);
568             break;
569         }
570         free(arg_copy);
571     }
572
573     free(rra_step_cnt);
574
575     /* if we got here and if there is an error and if the file has not been
576      * written to, then close things up and return. */
577     if (rrd_test_error()) {
578         goto err_free_structures;
579     }
580 #ifndef HAVE_MMAP
581     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
582         goto err_free_structures;
583     }
584 #endif
585
586     /* calling the smoothing code here guarantees at most one smoothing
587      * operation per rrd_update call. Unfortunately, it is possible with bulk
588      * updates, or a long-delayed update for smoothing to occur off-schedule.
589      * This really isn't critical except during the burn-in cycles. */
590     if (schedule_smooth) {
591         smooth_all_rras(&rrd, rrd_file, rra_begin);
592     }
593
594 /*    rrd_dontneed(rrd_file,&rrd); */
595     rrd_free(&rrd);
596     rrd_close(rrd_file);
597
598     free(pdp_new);
599     free(tmpl_idx);
600     free(pdp_temp);
601     free(skip_update);
602     free(updvals);
603     return 0;
604
605   err_free_structures:
606     free(pdp_new);
607     free(tmpl_idx);
608     free(pdp_temp);
609     free(skip_update);
610     free(updvals);
611   err_close:
612     rrd_close(rrd_file);
613   err_free:
614     rrd_free(&rrd);
615   err_out:
616     return -1;
617 }
618
619 /*
620  * get exclusive lock to whole file.
621  * lock gets removed when we close the file
622  *
623  * returns 0 on success
624  */
625 int rrd_lock(
626     rrd_file_t *file)
627 {
628     int       rcstat;
629
630     {
631 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
632         struct _stat st;
633
634         if (_fstat(file->fd, &st) == 0) {
635             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
636         } else {
637             rcstat = -1;
638         }
639 #else
640         struct flock lock;
641
642         lock.l_type = F_WRLCK;  /* exclusive write lock */
643         lock.l_len = 0; /* whole file */
644         lock.l_start = 0;   /* start of file */
645         lock.l_whence = SEEK_SET;   /* end of file */
646
647         rcstat = fcntl(file->fd, F_SETLK, &lock);
648 #endif
649     }
650
651     return (rcstat);
652 }
653
654 /*
655  * Allocate some important arrays used, and initialize the template.
656  *
657  * When it returns, either all of the structures are allocated
658  * or none of them are.
659  *
660  * Returns 0 on success, -1 on error.
661  */
662 static int allocate_data_structures(
663     rrd_t *rrd,
664     char ***updvals,
665     rrd_value_t **pdp_temp,
666     const char *tmplt,
667     long **tmpl_idx,
668     unsigned long *tmpl_cnt,
669     unsigned long **rra_step_cnt,
670     unsigned long **skip_update,
671     rrd_value_t **pdp_new)
672 {
673     unsigned  i, ii;
674     if ((*updvals = (char **) malloc(sizeof(char *)
675                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
676         rrd_set_error("allocating updvals pointer array.");
677         return -1;
678     }
679     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
680                                             * rrd->stat_head->ds_cnt)) ==
681         NULL) {
682         rrd_set_error("allocating pdp_temp.");
683         goto err_free_updvals;
684     }
685     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
686                                                  *
687                                                  rrd->stat_head->rra_cnt)) ==
688         NULL) {
689         rrd_set_error("allocating skip_update.");
690         goto err_free_pdp_temp;
691     }
692     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
693                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
694         rrd_set_error("allocating tmpl_idx.");
695         goto err_free_skip_update;
696     }
697     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
698                                                   *
699                                                   (rrd->stat_head->
700                                                    rra_cnt))) == NULL) {
701         rrd_set_error("allocating rra_step_cnt.");
702         goto err_free_tmpl_idx;
703     }
704
705     /* initialize tmplt redirector */
706     /* default config example (assume DS 1 is a CDEF DS)
707        tmpl_idx[0] -> 0; (time)
708        tmpl_idx[1] -> 1; (DS 0)
709        tmpl_idx[2] -> 3; (DS 2)
710        tmpl_idx[3] -> 4; (DS 3) */
711     (*tmpl_idx)[0] = 0; /* time */
712     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
713         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
714             (*tmpl_idx)[ii++] = i;
715     }
716     *tmpl_cnt = ii;
717
718     if (tmplt != NULL) {
719         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
720             goto err_free_rra_step_cnt;
721         }
722     }
723
724     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
725                                            * rrd->stat_head->ds_cnt)) == NULL) {
726         rrd_set_error("allocating pdp_new.");
727         goto err_free_rra_step_cnt;
728     }
729
730     return 0;
731
732   err_free_rra_step_cnt:
733     free(*rra_step_cnt);
734   err_free_tmpl_idx:
735     free(*tmpl_idx);
736   err_free_skip_update:
737     free(*skip_update);
738   err_free_pdp_temp:
739     free(*pdp_temp);
740   err_free_updvals:
741     free(*updvals);
742     return -1;
743 }
744
745 /*
746  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
747  *
748  * Returns 0 on success.
749  */
750 static int parse_template(
751     rrd_t *rrd,
752     const char *tmplt,
753     unsigned long *tmpl_cnt,
754     long *tmpl_idx)
755 {
756     char     *dsname, *tmplt_copy;
757     unsigned int tmpl_len, i;
758     int       ret = 0;
759
760     *tmpl_cnt = 1;      /* the first entry is the time */
761
762     /* we should work on a writeable copy here */
763     if ((tmplt_copy = strdup(tmplt)) == NULL) {
764         rrd_set_error("error copying tmplt '%s'", tmplt);
765         ret = -1;
766         goto out;
767     }
768
769     dsname = tmplt_copy;
770     tmpl_len = strlen(tmplt_copy);
771     for (i = 0; i <= tmpl_len; i++) {
772         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
773             tmplt_copy[i] = '\0';
774             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
775                 rrd_set_error("tmplt contains more DS definitions than RRD");
776                 ret = -1;
777                 goto out_free_tmpl_copy;
778             }
779             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
780                 rrd_set_error("unknown DS name '%s'", dsname);
781                 ret = -1;
782                 goto out_free_tmpl_copy;
783             }
784             /* go to the next entry on the tmplt_copy */
785             if (i < tmpl_len)
786                 dsname = &tmplt_copy[i + 1];
787         }
788     }
789   out_free_tmpl_copy:
790     free(tmplt_copy);
791   out:
792     return ret;
793 }
794
795 /*
796  * Parse an update string, updates the primary data points (PDPs)
797  * and consolidated data points (CDPs), and writes changes to the RRAs.
798  *
799  * Returns 0 on success, -1 on error.
800  */
801 static int process_arg(
802     char *step_start,
803     rrd_t *rrd,
804     rrd_file_t *rrd_file,
805     unsigned long rra_begin,
806     time_t *current_time,
807     unsigned long *current_time_usec,
808     rrd_value_t *pdp_temp,
809     rrd_value_t *pdp_new,
810     unsigned long *rra_step_cnt,
811     char **updvals,
812     long *tmpl_idx,
813     unsigned long tmpl_cnt,
814     rrd_info_t ** pcdp_summary,
815     int version,
816     unsigned long *skip_update,
817     int *schedule_smooth)
818 {
819     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
820
821     /* a vector of future Holt-Winters seasonal coefs */
822     unsigned long elapsed_pdp_st;
823
824     double    interval, pre_int, post_int;  /* interval between this and
825                                              * the last run */
826     unsigned long proc_pdp_cnt;
827
828     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
829                  current_time, current_time_usec, version) == -1) {
830         return -1;
831     }
832
833     interval = (double) (*current_time - rrd->live_head->last_up)
834         + (double) ((long) *current_time_usec -
835                     (long) rrd->live_head->last_up_usec) / 1e6f;
836
837     /* process the data sources and update the pdp_prep 
838      * area accordingly */
839     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
840         return -1;
841     }
842
843     elapsed_pdp_st = calculate_elapsed_steps(rrd,
844                                              *current_time,
845                                              *current_time_usec, interval,
846                                              &pre_int, &post_int,
847                                              &proc_pdp_cnt);
848
849     /* has a pdp_st moment occurred since the last run ? */
850     if (elapsed_pdp_st == 0) {
851         /* no we have not passed a pdp_st moment. therefore update is simple */
852         simple_update(rrd, interval, pdp_new);
853     } else {
854         /* an pdp_st has occurred. */
855         if (process_all_pdp_st(rrd, interval,
856                                pre_int, post_int,
857                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
858             return -1;
859         }
860         if (update_all_cdp_prep(rrd, rra_step_cnt,
861                                 rra_begin, rrd_file,
862                                 elapsed_pdp_st,
863                                 proc_pdp_cnt,
864                                 &last_seasonal_coef,
865                                 &seasonal_coef,
866                                 pdp_temp,
867                                 skip_update, schedule_smooth) == -1) {
868             goto err_free_coefficients;
869         }
870         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
871                                  elapsed_pdp_st, pdp_temp,
872                                  &seasonal_coef) == -1) {
873             goto err_free_coefficients;
874         }
875         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
876                           *current_time, skip_update,
877                           pcdp_summary) == -1) {
878             goto err_free_coefficients;
879         }
880     }                   /* endif a pdp_st has occurred */
881     rrd->live_head->last_up = *current_time;
882     rrd->live_head->last_up_usec = *current_time_usec;
883
884     if (version < 3) {
885         *rrd->legacy_last_up = rrd->live_head->last_up;
886     }
887     free(seasonal_coef);
888     free(last_seasonal_coef);
889     return 0;
890
891   err_free_coefficients:
892     free(seasonal_coef);
893     free(last_seasonal_coef);
894     return -1;
895 }
896
897 /*
898  * Parse a DS string (time + colon-separated values), storing the
899  * results in current_time, current_time_usec, and updvals.
900  *
901  * Returns 0 on success, -1 on error.
902  */
903 static int parse_ds(
904     rrd_t *rrd,
905     char **updvals,
906     long *tmpl_idx,
907     char *input,
908     unsigned long tmpl_cnt,
909     time_t *current_time,
910     unsigned long *current_time_usec,
911     int version)
912 {
913     char     *p;
914     unsigned long i;
915     char      timesyntax;
916
917     updvals[0] = input;
918     /* initialize all ds input to unknown except the first one
919        which has always got to be set */
920     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
921         updvals[i] = "U";
922
923     /* separate all ds elements; first must be examined separately
924        due to alternate time syntax */
925     if ((p = strchr(input, '@')) != NULL) {
926         timesyntax = '@';
927     } else if ((p = strchr(input, ':')) != NULL) {
928         timesyntax = ':';
929     } else {
930         rrd_set_error("expected timestamp not found in data source from %s",
931                       input);
932         return -1;
933     }
934     *p = '\0';
935     i = 1;
936     updvals[tmpl_idx[i++]] = p + 1;
937     while (*(++p)) {
938         if (*p == ':') {
939             *p = '\0';
940             if (i < tmpl_cnt) {
941                 updvals[tmpl_idx[i++]] = p + 1;
942             }
943         }
944     }
945
946     if (i != tmpl_cnt) {
947         rrd_set_error("expected %lu data source readings (got %lu) from %s",
948                       tmpl_cnt - 1, i, input);
949         return -1;
950     }
951
952     if (get_time_from_reading(rrd, timesyntax, updvals,
953                               current_time, current_time_usec,
954                               version) == -1) {
955         return -1;
956     }
957     return 0;
958 }
959
960 /*
961  * Parse the time in a DS string, store it in current_time and 
962  * current_time_usec and verify that it's later than the last
963  * update for this DS.
964  *
965  * Returns 0 on success, -1 on error.
966  */
967 static int get_time_from_reading(
968     rrd_t *rrd,
969     char timesyntax,
970     char **updvals,
971     time_t *current_time,
972     unsigned long *current_time_usec,
973     int version)
974 {
975     double    tmp;
976     char     *parsetime_error = NULL;
977     char     *old_locale;
978     rrd_time_value_t ds_tv;
979     struct timeval tmp_time;    /* used for time conversion */
980
981     /* get the time from the reading ... handle N */
982     if (timesyntax == '@') {    /* at-style */
983         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
984             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
985             return -1;
986         }
987         if (ds_tv.type == RELATIVE_TO_END_TIME ||
988             ds_tv.type == RELATIVE_TO_START_TIME) {
989             rrd_set_error("specifying time relative to the 'start' "
990                           "or 'end' makes no sense here: %s", updvals[0]);
991             return -1;
992         }
993         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
994         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
995     } else if (strcmp(updvals[0], "N") == 0) {
996         gettimeofday(&tmp_time, 0);
997         normalize_time(&tmp_time);
998         *current_time = tmp_time.tv_sec;
999         *current_time_usec = tmp_time.tv_usec;
1000     } else {
1001         old_locale = setlocale(LC_NUMERIC, "C");
1002         tmp = strtod(updvals[0], 0);
1003         setlocale(LC_NUMERIC, old_locale);
1004         *current_time = floor(tmp);
1005         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1006     }
1007     /* dont do any correction for old version RRDs */
1008     if (version < 3)
1009         *current_time_usec = 0;
1010
1011     if (*current_time < rrd->live_head->last_up ||
1012         (*current_time == rrd->live_head->last_up &&
1013          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1014         rrd_set_error("illegal attempt to update using time %ld when "
1015                       "last update time is %ld (minimum one second step)",
1016                       *current_time, rrd->live_head->last_up);
1017         return -1;
1018     }
1019     return 0;
1020 }
1021
1022 /*
1023  * Update pdp_new by interpreting the updvals according to the DS type
1024  * (COUNTER, GAUGE, etc.).
1025  *
1026  * Returns 0 on success, -1 on error.
1027  */
1028 static int update_pdp_prep(
1029     rrd_t *rrd,
1030     char **updvals,
1031     rrd_value_t *pdp_new,
1032     double interval)
1033 {
1034     unsigned long ds_idx;
1035     int       ii;
1036     char     *endptr;   /* used in the conversion */
1037     double    rate;
1038     char     *old_locale;
1039     enum dst_en dst_idx;
1040
1041     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1042         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1043
1044         /* make sure we do not build diffs with old last_ds values */
1045         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1046             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1047             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1048         }
1049
1050         /* NOTE: DST_CDEF should never enter this if block, because
1051          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1052          * accidently specified a value for the DST_CDEF. To handle this case,
1053          * an extra check is required. */
1054
1055         if ((updvals[ds_idx + 1][0] != 'U') &&
1056             (dst_idx != DST_CDEF) &&
1057             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1058             rate = DNAN;
1059
1060             /* pdp_new contains rate * time ... eg the bytes transferred during
1061              * the interval. Doing it this way saves a lot of math operations
1062              */
1063             switch (dst_idx) {
1064             case DST_COUNTER:
1065             case DST_DERIVE:
1066                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1067                     if ((updvals[ds_idx + 1][ii] < '0'
1068                          || updvals[ds_idx + 1][ii] > '9')
1069                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1070                         rrd_set_error("not a simple integer: '%s'",
1071                                       updvals[ds_idx + 1]);
1072                         return -1;
1073                     }
1074                 }
1075                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1076                     pdp_new[ds_idx] =
1077                         rrd_diff(updvals[ds_idx + 1],
1078                                  rrd->pdp_prep[ds_idx].last_ds);
1079                     if (dst_idx == DST_COUNTER) {
1080                         /* simple overflow catcher. This will fail
1081                          * terribly for non 32 or 64 bit counters
1082                          * ... are there any others in SNMP land?
1083                          */
1084                         if (pdp_new[ds_idx] < (double) 0.0)
1085                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1086                         if (pdp_new[ds_idx] < (double) 0.0)
1087                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1088                     }
1089                     rate = pdp_new[ds_idx] / interval;
1090                 } else {
1091                     pdp_new[ds_idx] = DNAN;
1092                 }
1093                 break;
1094             case DST_ABSOLUTE:
1095                 old_locale = setlocale(LC_NUMERIC, "C");
1096                 errno = 0;
1097                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1098                 setlocale(LC_NUMERIC, old_locale);
1099                 if (errno > 0) {
1100                     rrd_set_error("converting '%s' to float: %s",
1101                                   updvals[ds_idx + 1], rrd_strerror(errno));
1102                     return -1;
1103                 };
1104                 if (endptr[0] != '\0') {
1105                     rrd_set_error
1106                         ("conversion of '%s' to float not complete: tail '%s'",
1107                          updvals[ds_idx + 1], endptr);
1108                     return -1;
1109                 }
1110                 rate = pdp_new[ds_idx] / interval;
1111                 break;
1112             case DST_GAUGE:
1113                 errno = 0;
1114                 old_locale = setlocale(LC_NUMERIC, "C");
1115                 pdp_new[ds_idx] =
1116                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1117                 setlocale(LC_NUMERIC, old_locale);
1118                 if (errno) {
1119                     rrd_set_error("converting '%s' to float: %s",
1120                                   updvals[ds_idx + 1], rrd_strerror(errno));
1121                     return -1;
1122                 };
1123                 if (endptr[0] != '\0') {
1124                     rrd_set_error
1125                         ("conversion of '%s' to float not complete: tail '%s'",
1126                          updvals[ds_idx + 1], endptr);
1127                     return -1;
1128                 }
1129                 rate = pdp_new[ds_idx] / interval;
1130                 break;
1131             default:
1132                 rrd_set_error("rrd contains unknown DS type : '%s'",
1133                               rrd->ds_def[ds_idx].dst);
1134                 return -1;
1135             }
1136             /* break out of this for loop if the error string is set */
1137             if (rrd_test_error()) {
1138                 return -1;
1139             }
1140             /* make sure pdp_temp is neither too large or too small
1141              * if any of these occur it becomes unknown ...
1142              * sorry folks ... */
1143             if (!isnan(rate) &&
1144                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1145                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1146                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1147                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1148                 pdp_new[ds_idx] = DNAN;
1149             }
1150         } else {
1151             /* no news is news all the same */
1152             pdp_new[ds_idx] = DNAN;
1153         }
1154
1155
1156         /* make a copy of the command line argument for the next run */
1157 #ifdef DEBUG
1158         fprintf(stderr, "prep ds[%lu]\t"
1159                 "last_arg '%s'\t"
1160                 "this_arg '%s'\t"
1161                 "pdp_new %10.2f\n",
1162                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1163                 pdp_new[ds_idx]);
1164 #endif
1165         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1166                 LAST_DS_LEN - 1);
1167         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1168     }
1169     return 0;
1170 }
1171
1172 /*
1173  * How many PDP steps have elapsed since the last update? Returns the answer,
1174  * and stores the time between the last update and the last PDP in pre_time,
1175  * and the time between the last PDP and the current time in post_int.
1176  */
1177 static int calculate_elapsed_steps(
1178     rrd_t *rrd,
1179     unsigned long current_time,
1180     unsigned long current_time_usec,
1181     double interval,
1182     double *pre_int,
1183     double *post_int,
1184     unsigned long *proc_pdp_cnt)
1185 {
1186     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1187     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1188                                  * time */
1189     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1190                                  * when it was last updated */
1191     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1192
1193     /* when was the current pdp started */
1194     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1195     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1196
1197     /* when did the last pdp_st occur */
1198     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1199     occu_pdp_st = current_time - occu_pdp_age;
1200
1201     if (occu_pdp_st > proc_pdp_st) {
1202         /* OK we passed the pdp_st moment */
1203         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1204                                                                      * occurred before the latest
1205                                                                      * pdp_st moment*/
1206         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1207         *post_int = occu_pdp_age;   /* how much after it */
1208         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1209     } else {
1210         *pre_int = interval;
1211         *post_int = 0;
1212     }
1213
1214     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1215
1216 #ifdef DEBUG
1217     printf("proc_pdp_age %lu\t"
1218            "proc_pdp_st %lu\t"
1219            "occu_pfp_age %lu\t"
1220            "occu_pdp_st %lu\t"
1221            "int %lf\t"
1222            "pre_int %lf\t"
1223            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1224            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1225 #endif
1226
1227     /* compute the number of elapsed pdp_st moments */
1228     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1229 }
1230
1231 /*
1232  * Increment the PDP values by the values in pdp_new, or else initialize them.
1233  */
1234 static void simple_update(
1235     rrd_t *rrd,
1236     double interval,
1237     rrd_value_t *pdp_new)
1238 {
1239     int       i;
1240
1241     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1242         if (isnan(pdp_new[i])) {
1243             /* this is not really accurate if we use subsecond data arrival time
1244                should have thought of it when going subsecond resolution ...
1245                sorry next format change we will have it! */
1246             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1247                 floor(interval);
1248         } else {
1249             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1250                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1251             } else {
1252                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1253             }
1254         }
1255 #ifdef DEBUG
1256         fprintf(stderr,
1257                 "NO PDP  ds[%i]\t"
1258                 "value %10.2f\t"
1259                 "unkn_sec %5lu\n",
1260                 i,
1261                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1262                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1263 #endif
1264     }
1265 }
1266
1267 /*
1268  * Call process_pdp_st for each DS.
1269  *
1270  * Returns 0 on success, -1 on error.
1271  */
1272 static int process_all_pdp_st(
1273     rrd_t *rrd,
1274     double interval,
1275     double pre_int,
1276     double post_int,
1277     unsigned long elapsed_pdp_st,
1278     rrd_value_t *pdp_new,
1279     rrd_value_t *pdp_temp)
1280 {
1281     unsigned long ds_idx;
1282
1283     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1284        rate*seconds which occurred up to the last run.
1285        pdp_new[] contains rate*seconds from the latest run.
1286        pdp_temp[] will contain the rate for cdp */
1287
1288     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1289         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1290                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1291                            pdp_new, pdp_temp) == -1) {
1292             return -1;
1293         }
1294 #ifdef DEBUG
1295         fprintf(stderr, "PDP UPD ds[%lu]\t"
1296                 "elapsed_pdp_st %lu\t"
1297                 "pdp_temp %10.2f\t"
1298                 "new_prep %10.2f\t"
1299                 "new_unkn_sec %5lu\n",
1300                 ds_idx,
1301                 elapsed_pdp_st,
1302                 pdp_temp[ds_idx],
1303                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1304                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1305 #endif
1306     }
1307     return 0;
1308 }
1309
1310 /*
1311  * Process an update that occurs after one of the PDP moments.
1312  * Increments the PDP value, sets NAN if time greater than the
1313  * heartbeats have elapsed, processes CDEFs.
1314  *
1315  * Returns 0 on success, -1 on error.
1316  */
1317 static int process_pdp_st(
1318     rrd_t *rrd,
1319     unsigned long ds_idx,
1320     double interval,
1321     double pre_int,
1322     double post_int,
1323     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1324     rrd_value_t *pdp_new,
1325     rrd_value_t *pdp_temp)
1326 {
1327     int       i;
1328
1329     /* update pdp_prep to the current pdp_st. */
1330     double    pre_unknown = 0.0;
1331     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1332     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1333
1334     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1335
1336     rpnstack_init(&rpnstack);
1337
1338
1339     if (isnan(pdp_new[ds_idx])) {
1340         /* a final bit of unknown to be added before calculation
1341            we use a temporary variable for this so that we
1342            don't have to turn integer lines before using the value */
1343         pre_unknown = pre_int;
1344     } else {
1345         if (isnan(scratch[PDP_val].u_val)) {
1346             scratch[PDP_val].u_val = 0;
1347         }
1348         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1349     }
1350
1351     /* if too much of the pdp_prep is unknown we dump it */
1352     /* if the interval is larger thatn mrhb we get NAN */
1353     if ((interval > mrhb) ||
1354         (rrd->stat_head->pdp_step / 2.0 <
1355          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1356         pdp_temp[ds_idx] = DNAN;
1357     } else {
1358         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1359             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1360              pre_unknown);
1361     }
1362
1363     /* process CDEF data sources; remember each CDEF DS can
1364      * only reference other DS with a lower index number */
1365     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1366         rpnp_t   *rpnp;
1367
1368         rpnp =
1369             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1370         /* substitute data values for OP_VARIABLE nodes */
1371         for (i = 0; rpnp[i].op != OP_END; i++) {
1372             if (rpnp[i].op == OP_VARIABLE) {
1373                 rpnp[i].op = OP_NUMBER;
1374                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1375             }
1376         }
1377         /* run the rpn calculator */
1378         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1379             free(rpnp);
1380             rpnstack_free(&rpnstack);
1381             return -1;
1382         }
1383     }
1384
1385     /* make pdp_prep ready for the next run */
1386     if (isnan(pdp_new[ds_idx])) {
1387         /* this is not realy accurate if we use subsecond data arival time
1388            should have thought of it when going subsecond resolution ...
1389            sorry next format change we will have it! */
1390         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1391         scratch[PDP_val].u_val = DNAN;
1392     } else {
1393         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1394         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1395     }
1396     rpnstack_free(&rpnstack);
1397     return 0;
1398 }
1399
1400 /*
1401  * Iterate over all the RRAs for a given DS and:
1402  * 1. Decide whether to schedule a smooth later
1403  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1404  * 3. Update the CDP
1405  *
1406  * Returns 0 on success, -1 on error
1407  */
1408 static int update_all_cdp_prep(
1409     rrd_t *rrd,
1410     unsigned long *rra_step_cnt,
1411     unsigned long rra_begin,
1412     rrd_file_t *rrd_file,
1413     unsigned long elapsed_pdp_st,
1414     unsigned long proc_pdp_cnt,
1415     rrd_value_t **last_seasonal_coef,
1416     rrd_value_t **seasonal_coef,
1417     rrd_value_t *pdp_temp,
1418     unsigned long *skip_update,
1419     int *schedule_smooth)
1420 {
1421     unsigned long rra_idx;
1422
1423     /* index into the CDP scratch array */
1424     enum cf_en current_cf;
1425     unsigned long rra_start;
1426
1427     /* number of rows to be updated in an RRA for a data value. */
1428     unsigned long start_pdp_offset;
1429
1430     rra_start = rra_begin;
1431     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1432         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1433         start_pdp_offset =
1434             rrd->rra_def[rra_idx].pdp_cnt -
1435             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1436         skip_update[rra_idx] = 0;
1437         if (start_pdp_offset <= elapsed_pdp_st) {
1438             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1439                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1440         } else {
1441             rra_step_cnt[rra_idx] = 0;
1442         }
1443
1444         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1445             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1446              * so that they will be correct for the next observed value; note that for
1447              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1448              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1449             if (rra_step_cnt[rra_idx] > 1) {
1450                 skip_update[rra_idx] = 1;
1451                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1452                                 elapsed_pdp_st, last_seasonal_coef);
1453                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1454                                 elapsed_pdp_st + 1, seasonal_coef);
1455             }
1456             /* periodically run a smoother for seasonal effects */
1457             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1458 #ifdef DEBUG
1459                 fprintf(stderr,
1460                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1461                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1462                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1463                         u_cnt);
1464 #endif
1465                 *schedule_smooth = 1;
1466             }
1467         }
1468         if (rrd_test_error())
1469             return -1;
1470
1471         if (update_cdp_prep
1472             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1473              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1474              current_cf) == -1) {
1475             return -1;
1476         }
1477         rra_start +=
1478             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1479             sizeof(rrd_value_t);
1480     }
1481     return 0;
1482 }
1483
1484 /* 
1485  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1486  */
1487 static int do_schedule_smooth(
1488     rrd_t *rrd,
1489     unsigned long rra_idx,
1490     unsigned long elapsed_pdp_st)
1491 {
1492     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1493     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1494     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1495     unsigned long seasonal_smooth_idx =
1496         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1497     unsigned long *init_seasonal =
1498         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1499
1500     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1501      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1502      * really an RRA level, not a data source within RRA level parameter, but
1503      * the rra_def is read only for rrd_update (not flushed to disk). */
1504     if (*init_seasonal > BURNIN_CYCLES) {
1505         /* someone has no doubt invented a trick to deal with this wrap around,
1506          * but at least this code is clear. */
1507         if (seasonal_smooth_idx > cur_row) {
1508             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1509              * between PDP and CDP */
1510             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1511         }
1512         /* can't rely on negative numbers because we are working with
1513          * unsigned values */
1514         return (cur_row + elapsed_pdp_st >= row_cnt
1515                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1516     }
1517     /* mark off one of the burn-in cycles */
1518     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1519 }
1520
1521 /*
1522  * For a given RRA, iterate over the data sources and call the appropriate
1523  * consolidation function.
1524  *
1525  * Returns 0 on success, -1 on error.
1526  */
1527 static int update_cdp_prep(
1528     rrd_t *rrd,
1529     unsigned long elapsed_pdp_st,
1530     unsigned long start_pdp_offset,
1531     unsigned long *rra_step_cnt,
1532     int rra_idx,
1533     rrd_value_t *pdp_temp,
1534     rrd_value_t *last_seasonal_coef,
1535     rrd_value_t *seasonal_coef,
1536     int current_cf)
1537 {
1538     unsigned long ds_idx, cdp_idx;
1539
1540     /* update CDP_PREP areas */
1541     /* loop over data soures within each RRA */
1542     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1543
1544         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1545
1546         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1547             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1548                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1549                        elapsed_pdp_st, start_pdp_offset,
1550                        rrd->rra_def[rra_idx].pdp_cnt,
1551                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1552                        rra_idx, ds_idx);
1553         } else {
1554             /* Nothing to consolidate if there's one PDP per CDP. However, if
1555              * we've missed some PDPs, let's update null counters etc. */
1556             if (elapsed_pdp_st > 2) {
1557                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1558                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1559                           current_cf);
1560             }
1561         }
1562
1563         if (rrd_test_error())
1564             return -1;
1565     }                   /* endif data sources loop */
1566     return 0;
1567 }
1568
1569 /*
1570  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1571  * primary value, secondary value, and # of unknowns.
1572  */
1573 static void update_cdp(
1574     unival *scratch,
1575     int current_cf,
1576     rrd_value_t pdp_temp_val,
1577     unsigned long rra_step_cnt,
1578     unsigned long elapsed_pdp_st,
1579     unsigned long start_pdp_offset,
1580     unsigned long pdp_cnt,
1581     rrd_value_t xff,
1582     int i,
1583     int ii)
1584 {
1585     /* shorthand variables */
1586     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1587     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1588     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1589     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1590
1591     if (rra_step_cnt) {
1592         /* If we are in this block, as least 1 CDP value will be written to
1593          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1594          * to be written, then the "fill in" value is the CDP_secondary_val
1595          * entry. */
1596         if (isnan(pdp_temp_val)) {
1597             *cdp_unkn_pdp_cnt += start_pdp_offset;
1598             *cdp_secondary_val = DNAN;
1599         } else {
1600             /* CDP_secondary value is the RRA "fill in" value for intermediary
1601              * CDP data entries. No matter the CF, the value is the same because
1602              * the average, max, min, and last of a list of identical values is
1603              * the same, namely, the value itself. */
1604             *cdp_secondary_val = pdp_temp_val;
1605         }
1606
1607         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1608             *cdp_primary_val = DNAN;
1609             if (current_cf == CF_AVERAGE) {
1610                 *cdp_val =
1611                     initialize_average_carry_over(pdp_temp_val,
1612                                                   elapsed_pdp_st,
1613                                                   start_pdp_offset, pdp_cnt);
1614             } else {
1615                 *cdp_val = pdp_temp_val;
1616             }
1617         } else {
1618             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1619                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1620         }               /* endif meets xff value requirement for a valid value */
1621         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1622          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1623         if (isnan(pdp_temp_val))
1624             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1625         else
1626             *cdp_unkn_pdp_cnt = 0;
1627     } else {            /* rra_step_cnt[i]  == 0 */
1628
1629 #ifdef DEBUG
1630         if (isnan(*cdp_val)) {
1631             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1632                     i, ii);
1633         } else {
1634             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1635                     i, ii, *cdp_val);
1636         }
1637 #endif
1638         if (isnan(pdp_temp_val)) {
1639             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1640         } else {
1641             *cdp_val =
1642                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1643                                   current_cf, i, ii);
1644         }
1645     }
1646 }
1647
1648 /*
1649  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1650  * on the type of consolidation function.
1651  */
1652 static void initialize_cdp_val(
1653     unival *scratch,
1654     int current_cf,
1655     rrd_value_t pdp_temp_val,
1656     unsigned long elapsed_pdp_st,
1657     unsigned long start_pdp_offset,
1658     unsigned long pdp_cnt)
1659 {
1660     rrd_value_t cum_val, cur_val;
1661
1662     switch (current_cf) {
1663     case CF_AVERAGE:
1664         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1665         cur_val = IFDNAN(pdp_temp_val, 0.0);
1666         scratch[CDP_primary_val].u_val =
1667             (cum_val + cur_val * start_pdp_offset) /
1668             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1669         scratch[CDP_val].u_val =
1670             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1671                                           start_pdp_offset, pdp_cnt);
1672         break;
1673     case CF_MAXIMUM:
1674         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1675         cur_val = IFDNAN(pdp_temp_val, -DINF);
1676 #if 0
1677 #ifdef DEBUG
1678         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1679             fprintf(stderr,
1680                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1681                     i, ii);
1682             exit(-1);
1683         }
1684 #endif
1685 #endif
1686         if (cur_val > cum_val)
1687             scratch[CDP_primary_val].u_val = cur_val;
1688         else
1689             scratch[CDP_primary_val].u_val = cum_val;
1690         /* initialize carry over value */
1691         scratch[CDP_val].u_val = pdp_temp_val;
1692         break;
1693     case CF_MINIMUM:
1694         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1695         cur_val = IFDNAN(pdp_temp_val, DINF);
1696 #if 0
1697 #ifdef DEBUG
1698         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1699             fprintf(stderr,
1700                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1701                     ii);
1702             exit(-1);
1703         }
1704 #endif
1705 #endif
1706         if (cur_val < cum_val)
1707             scratch[CDP_primary_val].u_val = cur_val;
1708         else
1709             scratch[CDP_primary_val].u_val = cum_val;
1710         /* initialize carry over value */
1711         scratch[CDP_val].u_val = pdp_temp_val;
1712         break;
1713     case CF_LAST:
1714     default:
1715         scratch[CDP_primary_val].u_val = pdp_temp_val;
1716         /* initialize carry over value */
1717         scratch[CDP_val].u_val = pdp_temp_val;
1718         break;
1719     }
1720 }
1721
1722 /*
1723  * Update the consolidation function for Holt-Winters functions as
1724  * well as other functions that don't actually consolidate multiple
1725  * PDPs.
1726  */
1727 static void reset_cdp(
1728     rrd_t *rrd,
1729     unsigned long elapsed_pdp_st,
1730     rrd_value_t *pdp_temp,
1731     rrd_value_t *last_seasonal_coef,
1732     rrd_value_t *seasonal_coef,
1733     int rra_idx,
1734     int ds_idx,
1735     int cdp_idx,
1736     enum cf_en current_cf)
1737 {
1738     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1739
1740     switch (current_cf) {
1741     case CF_AVERAGE:
1742     default:
1743         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1744         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1745         break;
1746     case CF_SEASONAL:
1747     case CF_DEVSEASONAL:
1748         /* need to update cached seasonal values, so they are consistent
1749          * with the bulk update */
1750         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1751          * CDP_last_deviation are the same. */
1752         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1753         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1754         break;
1755     case CF_HWPREDICT:
1756     case CF_MHWPREDICT:
1757         /* need to update the null_count and last_null_count.
1758          * even do this for non-DNAN pdp_temp because the
1759          * algorithm is not learning from batch updates. */
1760         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1761         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1762         /* fall through */
1763     case CF_DEVPREDICT:
1764         scratch[CDP_primary_val].u_val = DNAN;
1765         scratch[CDP_secondary_val].u_val = DNAN;
1766         break;
1767     case CF_FAILURES:
1768         /* do not count missed bulk values as failures */
1769         scratch[CDP_primary_val].u_val = 0;
1770         scratch[CDP_secondary_val].u_val = 0;
1771         /* need to reset violations buffer.
1772          * could do this more carefully, but for now, just
1773          * assume a bulk update wipes away all violations. */
1774         erase_violations(rrd, cdp_idx, rra_idx);
1775         break;
1776     }
1777 }
1778
1779 static rrd_value_t initialize_average_carry_over(
1780     rrd_value_t pdp_temp_val,
1781     unsigned long elapsed_pdp_st,
1782     unsigned long start_pdp_offset,
1783     unsigned long pdp_cnt)
1784 {
1785     /* initialize carry over value */
1786     if (isnan(pdp_temp_val)) {
1787         return DNAN;
1788     }
1789     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1790 }
1791
1792 /*
1793  * Update or initialize a CDP value based on the consolidation
1794  * function.
1795  *
1796  * Returns the new value.
1797  */
1798 static rrd_value_t calculate_cdp_val(
1799     rrd_value_t cdp_val,
1800     rrd_value_t pdp_temp_val,
1801     unsigned long elapsed_pdp_st,
1802     int current_cf,
1803 #ifdef DEBUG
1804     int i,
1805     int ii
1806 #else
1807     int UNUSED(i),
1808     int UNUSED(ii)
1809 #endif
1810     )
1811 {
1812     if (isnan(cdp_val)) {
1813         if (current_cf == CF_AVERAGE) {
1814             pdp_temp_val *= elapsed_pdp_st;
1815         }
1816 #ifdef DEBUG
1817         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1818                 i, ii, pdp_temp_val);
1819 #endif
1820         return pdp_temp_val;
1821     }
1822     if (current_cf == CF_AVERAGE)
1823         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1824     if (current_cf == CF_MINIMUM)
1825         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1826     if (current_cf == CF_MAXIMUM)
1827         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1828
1829     return pdp_temp_val;
1830 }
1831
1832 /*
1833  * For each RRA, update the seasonal values and then call update_aberrant_CF
1834  * for each data source.
1835  *
1836  * Return 0 on success, -1 on error.
1837  */
1838 static int update_aberrant_cdps(
1839     rrd_t *rrd,
1840     rrd_file_t *rrd_file,
1841     unsigned long rra_begin,
1842     unsigned long elapsed_pdp_st,
1843     rrd_value_t *pdp_temp,
1844     rrd_value_t **seasonal_coef)
1845 {
1846     unsigned long rra_idx, ds_idx, j;
1847
1848     /* number of PDP steps since the last update that
1849      * are assigned to the first CDP to be generated
1850      * since the last update. */
1851     unsigned short scratch_idx;
1852     unsigned long rra_start;
1853     enum cf_en current_cf;
1854
1855     /* this loop is only entered if elapsed_pdp_st < 3 */
1856     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1857          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1858         rra_start = rra_begin;
1859         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1860             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1861                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1862                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1863                     if (scratch_idx == CDP_primary_val) {
1864                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1865                                         elapsed_pdp_st + 1, seasonal_coef);
1866                     } else {
1867                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1868                                         elapsed_pdp_st + 2, seasonal_coef);
1869                     }
1870                 }
1871                 if (rrd_test_error())
1872                     return -1;
1873                 /* loop over data soures within each RRA */
1874                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1875                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1876                                        rra_idx * (rrd->stat_head->ds_cnt) +
1877                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1878                                        *seasonal_coef);
1879                 }
1880             }
1881             rra_start += rrd->rra_def[rra_idx].row_cnt
1882                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1883         }
1884     }
1885     return 0;
1886 }
1887
1888 /* 
1889  * Move sequentially through the file, writing one RRA at a time.  Note this
1890  * architecture divorces the computation of CDP with flushing updated RRA
1891  * entries to disk.
1892  *
1893  * Return 0 on success, -1 on error.
1894  */
1895 static int write_to_rras(
1896     rrd_t *rrd,
1897     rrd_file_t *rrd_file,
1898     unsigned long *rra_step_cnt,
1899     unsigned long rra_begin,
1900     time_t current_time,
1901     unsigned long *skip_update,
1902     rrd_info_t ** pcdp_summary)
1903 {
1904     unsigned long rra_idx;
1905     unsigned long rra_start;
1906     time_t    rra_time = 0; /* time of update for a RRA */
1907
1908     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1909     
1910     /* Ready to write to disk */
1911     rra_start = rra_begin;
1912
1913     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1914         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1915         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1916
1917         /* for cdp_prep */
1918         unsigned short scratch_idx;
1919         unsigned long step_subtract;
1920
1921         for (scratch_idx = CDP_primary_val,
1922                  step_subtract = 1;
1923              rra_step_cnt[rra_idx] > 0;
1924              rra_step_cnt[rra_idx]--,
1925                  scratch_idx = CDP_secondary_val,
1926                  step_subtract = 2) {
1927
1928             off_t rra_pos_new;
1929 #ifdef DEBUG
1930             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1931 #endif
1932             /* increment, with wrap-around */
1933             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1934               rra_ptr->cur_row = 0;
1935
1936             /* we know what our position should be */
1937             rra_pos_new = rra_start
1938               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1939
1940             /* re-seek if the position is wrong or we wrapped around */
1941             if (rra_pos_new != rrd_file->pos) {
1942                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1943                     rrd_set_error("seek error in rrd");
1944                     return -1;
1945                 }
1946             }
1947 #ifdef DEBUG
1948             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1949 #endif
1950
1951             if (skip_update[rra_idx])
1952                 continue;
1953
1954             if (*pcdp_summary != NULL) {
1955                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1956
1957                 rra_time = (current_time - current_time % step_time)
1958                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1959             }
1960
1961             if (write_RRA_row
1962                 (rrd_file, rrd, rra_idx, scratch_idx,
1963                  pcdp_summary, rra_time) == -1)
1964                 return -1;
1965         }
1966
1967         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1968     } /* RRA LOOP */
1969
1970     return 0;
1971 }
1972
1973 /*
1974  * Write out one row of values (one value per DS) to the archive.
1975  *
1976  * Returns 0 on success, -1 on error.
1977  */
1978 static int write_RRA_row(
1979     rrd_file_t *rrd_file,
1980     rrd_t *rrd,
1981     unsigned long rra_idx,
1982     unsigned short CDP_scratch_idx,
1983     rrd_info_t ** pcdp_summary,
1984     time_t rra_time)
1985 {
1986     unsigned long ds_idx, cdp_idx;
1987     rrd_infoval_t iv;
1988
1989     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1990         /* compute the cdp index */
1991         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1992 #ifdef DEBUG
1993         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1994                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1995                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1996 #endif
1997         if (*pcdp_summary != NULL) {
1998             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1999             /* append info to the return hash */
2000             *pcdp_summary = rrd_info_push(*pcdp_summary,
2001                                           sprintf_alloc
2002                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2003                                            rrd->rra_def[rra_idx].cf_nam,
2004                                            rrd->rra_def[rra_idx].pdp_cnt,
2005                                            rrd->ds_def[ds_idx].ds_nam),
2006                                           RD_I_VAL, iv);
2007         }
2008         if (rrd_write(rrd_file,
2009                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2010                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2011             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2012             return -1;
2013         }
2014     }
2015     return 0;
2016 }
2017
2018 /*
2019  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2020  *
2021  * Returns 0 on success, -1 otherwise
2022  */
2023 static int smooth_all_rras(
2024     rrd_t *rrd,
2025     rrd_file_t *rrd_file,
2026     unsigned long rra_begin)
2027 {
2028     unsigned long rra_start = rra_begin;
2029     unsigned long rra_idx;
2030
2031     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2032         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2033             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2034 #ifdef DEBUG
2035             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2036 #endif
2037             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2038             if (rrd_test_error())
2039                 return -1;
2040         }
2041         rra_start += rrd->rra_def[rra_idx].row_cnt
2042             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2043     }
2044     return 0;
2045 }
2046
2047 #ifndef HAVE_MMAP
2048 /*
2049  * Flush changes to disk (unless we're using mmap)
2050  *
2051  * Returns 0 on success, -1 otherwise
2052  */
2053 static int write_changes_to_disk(
2054     rrd_t *rrd,
2055     rrd_file_t *rrd_file,
2056     int version)
2057 {
2058     /* we just need to write back the live header portion now */
2059     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2060                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2061                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2062                  SEEK_SET) != 0) {
2063         rrd_set_error("seek rrd for live header writeback");
2064         return -1;
2065     }
2066     if (version >= 3) {
2067         if (rrd_write(rrd_file, rrd->live_head,
2068                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2069             rrd_set_error("rrd_write live_head to rrd");
2070             return -1;
2071         }
2072     } else {
2073         if (rrd_write(rrd_file, rrd->legacy_last_up,
2074                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2075             rrd_set_error("rrd_write live_head to rrd");
2076             return -1;
2077         }
2078     }
2079
2080
2081     if (rrd_write(rrd_file, rrd->pdp_prep,
2082                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2083         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2084         rrd_set_error("rrd_write pdp_prep to rrd");
2085         return -1;
2086     }
2087
2088     if (rrd_write(rrd_file, rrd->cdp_prep,
2089                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2090                   rrd->stat_head->ds_cnt)
2091         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2092                       rrd->stat_head->ds_cnt)) {
2093
2094         rrd_set_error("rrd_write cdp_prep to rrd");
2095         return -1;
2096     }
2097
2098     if (rrd_write(rrd_file, rrd->rra_ptr,
2099                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2100         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2101         rrd_set_error("rrd_write rra_ptr to rrd");
2102         return -1;
2103     }
2104     return 0;
2105 }
2106 #endif