src/rrd_update.c: Remove the long options `--[no]cache'.
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.0  Copyright by Tobi Oetiker, 1997-2008
4  *                Copyright by Florian Forster, 2008
5  *****************************************************************************
6  * rrd_update.c  RRD Update Function
7  *****************************************************************************
8  * $Id$
9  *****************************************************************************/
10
11 #include "rrd_tool.h"
12
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
18
19 #include <locale.h>
20
21 #include "rrd_hw.h"
22 #include "rrd_rpncalc.h"
23
24 #include "rrd_is_thread_safe.h"
25 #include "unused.h"
26
27 #include "rrd_client.h"
28
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 /*
31  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
32  * replacement.
33  */
34 #include <sys/timeb.h>
35
36 #ifndef __MINGW32__
37 struct timeval {
38     time_t    tv_sec;   /* seconds */
39     long      tv_usec;  /* microseconds */
40 };
41 #endif
42
43 struct __timezone {
44     int       tz_minuteswest;   /* minutes W of Greenwich */
45     int       tz_dsttime;   /* type of dst correction */
46 };
47
48 static int gettimeofday(
49     struct timeval *t,
50     struct __timezone *tz)
51 {
52
53     struct _timeb current_time;
54
55     _ftime(&current_time);
56
57     t->tv_sec = current_time.time;
58     t->tv_usec = current_time.millitm * 1000;
59
60     return 0;
61 }
62
63 #endif
64
65 /* FUNCTION PROTOTYPES */
66
67 int       rrd_update_r(
68     const char *filename,
69     const char *tmplt,
70     int argc,
71     const char **argv);
72 int       _rrd_update(
73     const char *filename,
74     const char *tmplt,
75     int argc,
76     const char **argv,
77     rrd_info_t *);
78
79 static int allocate_data_structures(
80     rrd_t *rrd,
81     char ***updvals,
82     rrd_value_t **pdp_temp,
83     const char *tmplt,
84     long **tmpl_idx,
85     unsigned long *tmpl_cnt,
86     unsigned long **rra_step_cnt,
87     unsigned long **skip_update,
88     rrd_value_t **pdp_new);
89
90 static int parse_template(
91     rrd_t *rrd,
92     const char *tmplt,
93     unsigned long *tmpl_cnt,
94     long *tmpl_idx);
95
96 static int process_arg(
97     char *step_start,
98     rrd_t *rrd,
99     rrd_file_t *rrd_file,
100     unsigned long rra_begin,
101     unsigned long *rra_current,
102     time_t *current_time,
103     unsigned long *current_time_usec,
104     rrd_value_t *pdp_temp,
105     rrd_value_t *pdp_new,
106     unsigned long *rra_step_cnt,
107     char **updvals,
108     long *tmpl_idx,
109     unsigned long tmpl_cnt,
110     rrd_info_t ** pcdp_summary,
111     int version,
112     unsigned long *skip_update,
113     int *schedule_smooth);
114
115 static int parse_ds(
116     rrd_t *rrd,
117     char **updvals,
118     long *tmpl_idx,
119     char *input,
120     unsigned long tmpl_cnt,
121     time_t *current_time,
122     unsigned long *current_time_usec,
123     int version);
124
125 static int get_time_from_reading(
126     rrd_t *rrd,
127     char timesyntax,
128     char **updvals,
129     time_t *current_time,
130     unsigned long *current_time_usec,
131     int version);
132
133 static int update_pdp_prep(
134     rrd_t *rrd,
135     char **updvals,
136     rrd_value_t *pdp_new,
137     double interval);
138
139 static int calculate_elapsed_steps(
140     rrd_t *rrd,
141     unsigned long current_time,
142     unsigned long current_time_usec,
143     double interval,
144     double *pre_int,
145     double *post_int,
146     unsigned long *proc_pdp_cnt);
147
148 static void simple_update(
149     rrd_t *rrd,
150     double interval,
151     rrd_value_t *pdp_new);
152
153 static int process_all_pdp_st(
154     rrd_t *rrd,
155     double interval,
156     double pre_int,
157     double post_int,
158     unsigned long elapsed_pdp_st,
159     rrd_value_t *pdp_new,
160     rrd_value_t *pdp_temp);
161
162 static int process_pdp_st(
163     rrd_t *rrd,
164     unsigned long ds_idx,
165     double interval,
166     double pre_int,
167     double post_int,
168     long diff_pdp_st,
169     rrd_value_t *pdp_new,
170     rrd_value_t *pdp_temp);
171
172 static int update_all_cdp_prep(
173     rrd_t *rrd,
174     unsigned long *rra_step_cnt,
175     unsigned long rra_begin,
176     rrd_file_t *rrd_file,
177     unsigned long elapsed_pdp_st,
178     unsigned long proc_pdp_cnt,
179     rrd_value_t **last_seasonal_coef,
180     rrd_value_t **seasonal_coef,
181     rrd_value_t *pdp_temp,
182     unsigned long *rra_current,
183     unsigned long *skip_update,
184     int *schedule_smooth);
185
186 static int do_schedule_smooth(
187     rrd_t *rrd,
188     unsigned long rra_idx,
189     unsigned long elapsed_pdp_st);
190
191 static int update_cdp_prep(
192     rrd_t *rrd,
193     unsigned long elapsed_pdp_st,
194     unsigned long start_pdp_offset,
195     unsigned long *rra_step_cnt,
196     int rra_idx,
197     rrd_value_t *pdp_temp,
198     rrd_value_t *last_seasonal_coef,
199     rrd_value_t *seasonal_coef,
200     int current_cf);
201
202 static void update_cdp(
203     unival *scratch,
204     int current_cf,
205     rrd_value_t pdp_temp_val,
206     unsigned long rra_step_cnt,
207     unsigned long elapsed_pdp_st,
208     unsigned long start_pdp_offset,
209     unsigned long pdp_cnt,
210     rrd_value_t xff,
211     int i,
212     int ii);
213
214 static void initialize_cdp_val(
215     unival *scratch,
216     int current_cf,
217     rrd_value_t pdp_temp_val,
218     unsigned long elapsed_pdp_st,
219     unsigned long start_pdp_offset,
220     unsigned long pdp_cnt);
221
222 static void reset_cdp(
223     rrd_t *rrd,
224     unsigned long elapsed_pdp_st,
225     rrd_value_t *pdp_temp,
226     rrd_value_t *last_seasonal_coef,
227     rrd_value_t *seasonal_coef,
228     int rra_idx,
229     int ds_idx,
230     int cdp_idx,
231     enum cf_en current_cf);
232
233 static rrd_value_t initialize_average_carry_over(
234     rrd_value_t pdp_temp_val,
235     unsigned long elapsed_pdp_st,
236     unsigned long start_pdp_offset,
237     unsigned long pdp_cnt);
238
239 static rrd_value_t calculate_cdp_val(
240     rrd_value_t cdp_val,
241     rrd_value_t pdp_temp_val,
242     unsigned long elapsed_pdp_st,
243     int current_cf,
244     int i,
245     int ii);
246
247 static int update_aberrant_cdps(
248     rrd_t *rrd,
249     rrd_file_t *rrd_file,
250     unsigned long rra_begin,
251     unsigned long *rra_current,
252     unsigned long elapsed_pdp_st,
253     rrd_value_t *pdp_temp,
254     rrd_value_t **seasonal_coef);
255
256 static int write_to_rras(
257     rrd_t *rrd,
258     rrd_file_t *rrd_file,
259     unsigned long *rra_step_cnt,
260     unsigned long rra_begin,
261     unsigned long *rra_current,
262     time_t current_time,
263     unsigned long *skip_update,
264     rrd_info_t ** pcdp_summary);
265
266 static int write_RRA_row(
267     rrd_file_t *rrd_file,
268     rrd_t *rrd,
269     unsigned long rra_idx,
270     unsigned long *rra_current,
271     unsigned short CDP_scratch_idx,
272     rrd_info_t ** pcdp_summary,
273     time_t rra_time);
274
275 static int smooth_all_rras(
276     rrd_t *rrd,
277     rrd_file_t *rrd_file,
278     unsigned long rra_begin);
279
280 #ifndef HAVE_MMAP
281 static int write_changes_to_disk(
282     rrd_t *rrd,
283     rrd_file_t *rrd_file,
284     int version);
285 #endif
286
287 /*
288  * normalize time as returned by gettimeofday. usec part must
289  * be always >= 0
290  */
291 static inline void normalize_time(
292     struct timeval *t)
293 {
294     if (t->tv_usec < 0) {
295         t->tv_sec--;
296         t->tv_usec += 1e6L;
297     }
298 }
299
300 /*
301  * Sets current_time and current_time_usec based on the current time.
302  * current_time_usec is set to 0 if the version number is 1 or 2.
303  */
304 static inline void initialize_time(
305     time_t *current_time,
306     unsigned long *current_time_usec,
307     int version)
308 {
309     struct timeval tmp_time;    /* used for time conversion */
310
311     gettimeofday(&tmp_time, 0);
312     normalize_time(&tmp_time);
313     *current_time = tmp_time.tv_sec;
314     if (version >= 3) {
315         *current_time_usec = tmp_time.tv_usec;
316     } else {
317         *current_time_usec = 0;
318     }
319 }
320
321 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
322
323 rrd_info_t *rrd_update_v(
324     int argc,
325     char **argv)
326 {
327     char     *tmplt = NULL;
328     rrd_info_t *result = NULL;
329     rrd_infoval_t rc;
330     struct option long_options[] = {
331         {"template", required_argument, 0, 't'},
332         {0, 0, 0, 0}
333     };
334
335     rc.u_int = -1;
336     optind = 0;
337     opterr = 0;         /* initialize getopt */
338
339     while (1) {
340         int       option_index = 0;
341         int       opt;
342
343         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
344
345         if (opt == EOF)
346             break;
347
348         switch (opt) {
349         case 't':
350             tmplt = optarg;
351             break;
352
353         case '?':
354             rrd_set_error("unknown option '%s'", argv[optind - 1]);
355             goto end_tag;
356         }
357     }
358
359     /* need at least 2 arguments: filename, data. */
360     if (argc - optind < 2) {
361         rrd_set_error("Not enough arguments");
362         goto end_tag;
363     }
364     rc.u_int = 0;
365     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
366     rc.u_int = _rrd_update(argv[optind], tmplt,
367                            argc - optind - 1,
368                            (const char **) (argv + optind + 1), result);
369     result->value.u_int = rc.u_int;
370   end_tag:
371     return result;
372 }
373
374 int rrd_update(
375     int argc,
376     char **argv)
377 {
378     struct option long_options[] = {
379         {"template", required_argument, 0, 't'},
380         {"daemon",   required_argument, 0, 'd'},
381         {0, 0, 0, 0}
382     };
383     int       option_index = 0;
384     int       opt;
385     char     *tmplt = NULL;
386     int       rc = -1;
387     char     *daemon = NULL;
388
389     optind = 0;
390     opterr = 0;         /* initialize getopt */
391
392     while (1) {
393         opt = getopt_long(argc, argv, "t:cnd:", long_options, &option_index);
394
395         if (opt == EOF)
396             break;
397
398         switch (opt) {
399         case 't':
400             tmplt = strdup(optarg);
401             break;
402
403         case 'd':
404             if (daemon != NULL)
405                 free (daemon);
406             daemon = strdup (optarg);
407             if (daemon == NULL)
408             {
409                 rrd_set_error("strdup failed.");
410                 goto out;
411             }
412             break;
413
414         case '?':
415             rrd_set_error("unknown option '%s'", argv[optind - 1]);
416             goto out;
417         }
418     }
419
420     /* need at least 2 arguments: filename, data. */
421     if (argc - optind < 2) {
422         rrd_set_error("Not enough arguments");
423         goto out;
424     }
425
426     if ((tmplt != NULL) && (daemon != NULL))
427     {
428         rrd_set_error("The caching daemon cannot be used together with "
429                 "templates yet.");
430         goto out;
431     }
432
433     if (daemon != NULL)
434     {
435         int status;
436
437         status = rrdc_connect (daemon);
438         if (status != 0)
439         {
440             rrd_set_error("Unable to connect to daemon: %s",
441                     (status < 0)
442                     ? "Internal error"
443                     : rrd_strerror (status));
444             goto out;
445         }
446
447         status = rrdc_update (/* file = */ argv[optind],
448                 /* values_num = */ argc - optind - 1,
449                 /* values = */ (void *) (argv + optind + 1));
450         if (status != 0)
451         {
452             rrd_set_error("Failed sending the values to the daemon: %s",
453                     (status < 0)
454                     ? "Internal error"
455                     : rrd_strerror (status));
456         }
457
458         rrdc_disconnect ();
459         goto out;
460     } /* if (daemon != NULL) */
461
462     rc = rrd_update_r(argv[optind], tmplt,
463                       argc - optind - 1, (const char **) (argv + optind + 1));
464   out:
465     if (tmplt != NULL)
466     {
467         free(tmplt);
468         tmplt = NULL;
469     }
470     if (daemon != NULL)
471     {
472         free (daemon);
473         daemon = NULL;
474     }
475     return rc;
476 }
477
478 int rrd_update_r(
479     const char *filename,
480     const char *tmplt,
481     int argc,
482     const char **argv)
483 {
484     return _rrd_update(filename, tmplt, argc, argv, NULL);
485 }
486
487 int _rrd_update(
488     const char *filename,
489     const char *tmplt,
490     int argc,
491     const char **argv,
492     rrd_info_t * pcdp_summary)
493 {
494
495     int       arg_i = 2;
496
497     unsigned long rra_begin;    /* byte pointer to the rra
498                                  * area in the rrd file.  this
499                                  * pointer never changes value */
500     unsigned long rra_current;  /* byte pointer to the current write
501                                  * spot in the rrd file. */
502     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
503                              * to the existing entry */
504     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
505                              * to the cdp values */
506
507     long     *tmpl_idx; /* index representing the settings
508                          * transported by the tmplt index */
509     unsigned long tmpl_cnt = 2; /* time and data */
510     rrd_t     rrd;
511     time_t    current_time = 0;
512     unsigned long current_time_usec = 0;    /* microseconds part of current time */
513     char    **updvals;
514     int       schedule_smooth = 0;
515
516     /* number of elapsed PDP steps since last update */
517     unsigned long *rra_step_cnt = NULL;
518
519     int       version;  /* rrd version */
520     rrd_file_t *rrd_file;
521     char     *arg_copy; /* for processing the argv */
522     unsigned long *skip_update; /* RRAs to advance but not write */
523
524     /* need at least 1 arguments: data. */
525     if (argc < 1) {
526         rrd_set_error("Not enough arguments");
527         goto err_out;
528     }
529
530     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
531         goto err_free;
532     }
533     /* We are now at the beginning of the rra's */
534     rra_current = rra_begin = rrd_file->header_len;
535
536     version = atoi(rrd.stat_head->version);
537
538     initialize_time(&current_time, &current_time_usec, version);
539
540     /* get exclusive lock to whole file.
541      * lock gets removed when we close the file.
542      */
543     if (rrd_lock(rrd_file) != 0) {
544         rrd_set_error("could not lock RRD");
545         goto err_close;
546     }
547
548     if (allocate_data_structures(&rrd, &updvals,
549                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
550                                  &rra_step_cnt, &skip_update,
551                                  &pdp_new) == -1) {
552         goto err_close;
553     }
554
555     /* loop through the arguments. */
556     for (arg_i = 0; arg_i < argc; arg_i++) {
557         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
558             rrd_set_error("failed duplication argv entry");
559             break;
560         }
561         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
562                         &current_time, &current_time_usec, pdp_temp, pdp_new,
563                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
564                         &pcdp_summary, version, skip_update,
565                         &schedule_smooth) == -1) {
566             free(arg_copy);
567             break;
568         }
569         free(arg_copy);
570     }
571
572     free(rra_step_cnt);
573
574     /* if we got here and if there is an error and if the file has not been
575      * written to, then close things up and return. */
576     if (rrd_test_error()) {
577         goto err_free_structures;
578     }
579 #ifndef HAVE_MMAP
580     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
581         goto err_free_structures;
582     }
583 #endif
584
585     /* calling the smoothing code here guarantees at most one smoothing
586      * operation per rrd_update call. Unfortunately, it is possible with bulk
587      * updates, or a long-delayed update for smoothing to occur off-schedule.
588      * This really isn't critical except during the burn-in cycles. */
589     if (schedule_smooth) {
590         smooth_all_rras(&rrd, rrd_file, rra_begin);
591     }
592
593 /*    rrd_dontneed(rrd_file,&rrd); */
594     rrd_free(&rrd);
595     rrd_close(rrd_file);
596
597     free(pdp_new);
598     free(tmpl_idx);
599     free(pdp_temp);
600     free(skip_update);
601     free(updvals);
602     return 0;
603
604   err_free_structures:
605     free(pdp_new);
606     free(tmpl_idx);
607     free(pdp_temp);
608     free(skip_update);
609     free(updvals);
610   err_close:
611     rrd_close(rrd_file);
612   err_free:
613     rrd_free(&rrd);
614   err_out:
615     return -1;
616 }
617
618 /*
619  * get exclusive lock to whole file.
620  * lock gets removed when we close the file
621  *
622  * returns 0 on success
623  */
624 int rrd_lock(
625     rrd_file_t *file)
626 {
627     int       rcstat;
628
629     {
630 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
631         struct _stat st;
632
633         if (_fstat(file->fd, &st) == 0) {
634             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
635         } else {
636             rcstat = -1;
637         }
638 #else
639         struct flock lock;
640
641         lock.l_type = F_WRLCK;  /* exclusive write lock */
642         lock.l_len = 0; /* whole file */
643         lock.l_start = 0;   /* start of file */
644         lock.l_whence = SEEK_SET;   /* end of file */
645
646         rcstat = fcntl(file->fd, F_SETLK, &lock);
647 #endif
648     }
649
650     return (rcstat);
651 }
652
653 /*
654  * Allocate some important arrays used, and initialize the template.
655  *
656  * When it returns, either all of the structures are allocated
657  * or none of them are.
658  *
659  * Returns 0 on success, -1 on error.
660  */
661 static int allocate_data_structures(
662     rrd_t *rrd,
663     char ***updvals,
664     rrd_value_t **pdp_temp,
665     const char *tmplt,
666     long **tmpl_idx,
667     unsigned long *tmpl_cnt,
668     unsigned long **rra_step_cnt,
669     unsigned long **skip_update,
670     rrd_value_t **pdp_new)
671 {
672     unsigned  i, ii;
673     if ((*updvals = (char **) malloc(sizeof(char *)
674                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
675         rrd_set_error("allocating updvals pointer array.");
676         return -1;
677     }
678     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
679                                             * rrd->stat_head->ds_cnt)) ==
680         NULL) {
681         rrd_set_error("allocating pdp_temp.");
682         goto err_free_updvals;
683     }
684     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
685                                                  *
686                                                  rrd->stat_head->rra_cnt)) ==
687         NULL) {
688         rrd_set_error("allocating skip_update.");
689         goto err_free_pdp_temp;
690     }
691     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
692                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
693         rrd_set_error("allocating tmpl_idx.");
694         goto err_free_skip_update;
695     }
696     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
697                                                   *
698                                                   (rrd->stat_head->
699                                                    rra_cnt))) == NULL) {
700         rrd_set_error("allocating rra_step_cnt.");
701         goto err_free_tmpl_idx;
702     }
703
704     /* initialize tmplt redirector */
705     /* default config example (assume DS 1 is a CDEF DS)
706        tmpl_idx[0] -> 0; (time)
707        tmpl_idx[1] -> 1; (DS 0)
708        tmpl_idx[2] -> 3; (DS 2)
709        tmpl_idx[3] -> 4; (DS 3) */
710     (*tmpl_idx)[0] = 0; /* time */
711     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
712         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
713             (*tmpl_idx)[ii++] = i;
714     }
715     *tmpl_cnt = ii;
716
717     if (tmplt != NULL) {
718         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
719             goto err_free_rra_step_cnt;
720         }
721     }
722
723     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
724                                            * rrd->stat_head->ds_cnt)) == NULL) {
725         rrd_set_error("allocating pdp_new.");
726         goto err_free_rra_step_cnt;
727     }
728
729     return 0;
730
731   err_free_rra_step_cnt:
732     free(*rra_step_cnt);
733   err_free_tmpl_idx:
734     free(*tmpl_idx);
735   err_free_skip_update:
736     free(*skip_update);
737   err_free_pdp_temp:
738     free(*pdp_temp);
739   err_free_updvals:
740     free(*updvals);
741     return -1;
742 }
743
744 /*
745  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
746  *
747  * Returns 0 on success.
748  */
749 static int parse_template(
750     rrd_t *rrd,
751     const char *tmplt,
752     unsigned long *tmpl_cnt,
753     long *tmpl_idx)
754 {
755     char     *dsname, *tmplt_copy;
756     unsigned int tmpl_len, i;
757     int       ret = 0;
758
759     *tmpl_cnt = 1;      /* the first entry is the time */
760
761     /* we should work on a writeable copy here */
762     if ((tmplt_copy = strdup(tmplt)) == NULL) {
763         rrd_set_error("error copying tmplt '%s'", tmplt);
764         ret = -1;
765         goto out;
766     }
767
768     dsname = tmplt_copy;
769     tmpl_len = strlen(tmplt_copy);
770     for (i = 0; i <= tmpl_len; i++) {
771         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
772             tmplt_copy[i] = '\0';
773             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
774                 rrd_set_error("tmplt contains more DS definitions than RRD");
775                 ret = -1;
776                 goto out_free_tmpl_copy;
777             }
778             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
779                 rrd_set_error("unknown DS name '%s'", dsname);
780                 ret = -1;
781                 goto out_free_tmpl_copy;
782             }
783             /* go to the next entry on the tmplt_copy */
784             if (i < tmpl_len)
785                 dsname = &tmplt_copy[i + 1];
786         }
787     }
788   out_free_tmpl_copy:
789     free(tmplt_copy);
790   out:
791     return ret;
792 }
793
794 /*
795  * Parse an update string, updates the primary data points (PDPs)
796  * and consolidated data points (CDPs), and writes changes to the RRAs.
797  *
798  * Returns 0 on success, -1 on error.
799  */
800 static int process_arg(
801     char *step_start,
802     rrd_t *rrd,
803     rrd_file_t *rrd_file,
804     unsigned long rra_begin,
805     unsigned long *rra_current,
806     time_t *current_time,
807     unsigned long *current_time_usec,
808     rrd_value_t *pdp_temp,
809     rrd_value_t *pdp_new,
810     unsigned long *rra_step_cnt,
811     char **updvals,
812     long *tmpl_idx,
813     unsigned long tmpl_cnt,
814     rrd_info_t ** pcdp_summary,
815     int version,
816     unsigned long *skip_update,
817     int *schedule_smooth)
818 {
819     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
820
821     /* a vector of future Holt-Winters seasonal coefs */
822     unsigned long elapsed_pdp_st;
823
824     double    interval, pre_int, post_int;  /* interval between this and
825                                              * the last run */
826     unsigned long proc_pdp_cnt;
827     unsigned long rra_start;
828
829     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
830                  current_time, current_time_usec, version) == -1) {
831         return -1;
832     }
833     /* seek to the beginning of the rra's */
834     if (*rra_current != rra_begin) {
835 #ifndef HAVE_MMAP
836         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
837             rrd_set_error("seek error in rrd");
838             return -1;
839         }
840 #endif
841         *rra_current = rra_begin;
842     }
843     rra_start = rra_begin;
844
845     interval = (double) (*current_time - rrd->live_head->last_up)
846         + (double) ((long) *current_time_usec -
847                     (long) rrd->live_head->last_up_usec) / 1e6f;
848
849     /* process the data sources and update the pdp_prep 
850      * area accordingly */
851     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
852         return -1;
853     }
854
855     elapsed_pdp_st = calculate_elapsed_steps(rrd,
856                                              *current_time,
857                                              *current_time_usec, interval,
858                                              &pre_int, &post_int,
859                                              &proc_pdp_cnt);
860
861     /* has a pdp_st moment occurred since the last run ? */
862     if (elapsed_pdp_st == 0) {
863         /* no we have not passed a pdp_st moment. therefore update is simple */
864         simple_update(rrd, interval, pdp_new);
865     } else {
866         /* an pdp_st has occurred. */
867         if (process_all_pdp_st(rrd, interval,
868                                pre_int, post_int,
869                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
870             return -1;
871         }
872         if (update_all_cdp_prep(rrd, rra_step_cnt,
873                                 rra_begin, rrd_file,
874                                 elapsed_pdp_st,
875                                 proc_pdp_cnt,
876                                 &last_seasonal_coef,
877                                 &seasonal_coef,
878                                 pdp_temp, rra_current,
879                                 skip_update, schedule_smooth) == -1) {
880             goto err_free_coefficients;
881         }
882         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
883                                  elapsed_pdp_st, pdp_temp,
884                                  &seasonal_coef) == -1) {
885             goto err_free_coefficients;
886         }
887         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
888                           rra_current, *current_time, skip_update,
889                           pcdp_summary) == -1) {
890             goto err_free_coefficients;
891         }
892     }                   /* endif a pdp_st has occurred */
893     rrd->live_head->last_up = *current_time;
894     rrd->live_head->last_up_usec = *current_time_usec;
895
896     if (version < 3) {
897         *rrd->legacy_last_up = rrd->live_head->last_up;
898     }
899     free(seasonal_coef);
900     free(last_seasonal_coef);
901     return 0;
902
903   err_free_coefficients:
904     free(seasonal_coef);
905     free(last_seasonal_coef);
906     return -1;
907 }
908
909 /*
910  * Parse a DS string (time + colon-separated values), storing the
911  * results in current_time, current_time_usec, and updvals.
912  *
913  * Returns 0 on success, -1 on error.
914  */
915 static int parse_ds(
916     rrd_t *rrd,
917     char **updvals,
918     long *tmpl_idx,
919     char *input,
920     unsigned long tmpl_cnt,
921     time_t *current_time,
922     unsigned long *current_time_usec,
923     int version)
924 {
925     char     *p;
926     unsigned long i;
927     char      timesyntax;
928
929     updvals[0] = input;
930     /* initialize all ds input to unknown except the first one
931        which has always got to be set */
932     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
933         updvals[i] = "U";
934
935     /* separate all ds elements; first must be examined separately
936        due to alternate time syntax */
937     if ((p = strchr(input, '@')) != NULL) {
938         timesyntax = '@';
939     } else if ((p = strchr(input, ':')) != NULL) {
940         timesyntax = ':';
941     } else {
942         rrd_set_error("expected timestamp not found in data source from %s",
943                       input);
944         return -1;
945     }
946     *p = '\0';
947     i = 1;
948     updvals[tmpl_idx[i++]] = p + 1;
949     while (*(++p)) {
950         if (*p == ':') {
951             *p = '\0';
952             if (i < tmpl_cnt) {
953                 updvals[tmpl_idx[i++]] = p + 1;
954             }
955         }
956     }
957
958     if (i != tmpl_cnt) {
959         rrd_set_error("expected %lu data source readings (got %lu) from %s",
960                       tmpl_cnt - 1, i, input);
961         return -1;
962     }
963
964     if (get_time_from_reading(rrd, timesyntax, updvals,
965                               current_time, current_time_usec,
966                               version) == -1) {
967         return -1;
968     }
969     return 0;
970 }
971
972 /*
973  * Parse the time in a DS string, store it in current_time and 
974  * current_time_usec and verify that it's later than the last
975  * update for this DS.
976  *
977  * Returns 0 on success, -1 on error.
978  */
979 static int get_time_from_reading(
980     rrd_t *rrd,
981     char timesyntax,
982     char **updvals,
983     time_t *current_time,
984     unsigned long *current_time_usec,
985     int version)
986 {
987     double    tmp;
988     char     *parsetime_error = NULL;
989     char     *old_locale;
990     rrd_time_value_t ds_tv;
991     struct timeval tmp_time;    /* used for time conversion */
992
993     /* get the time from the reading ... handle N */
994     if (timesyntax == '@') {    /* at-style */
995         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
996             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
997             return -1;
998         }
999         if (ds_tv.type == RELATIVE_TO_END_TIME ||
1000             ds_tv.type == RELATIVE_TO_START_TIME) {
1001             rrd_set_error("specifying time relative to the 'start' "
1002                           "or 'end' makes no sense here: %s", updvals[0]);
1003             return -1;
1004         }
1005         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1006         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1007     } else if (strcmp(updvals[0], "N") == 0) {
1008         gettimeofday(&tmp_time, 0);
1009         normalize_time(&tmp_time);
1010         *current_time = tmp_time.tv_sec;
1011         *current_time_usec = tmp_time.tv_usec;
1012     } else {
1013         old_locale = setlocale(LC_NUMERIC, "C");
1014         tmp = strtod(updvals[0], 0);
1015         setlocale(LC_NUMERIC, old_locale);
1016         *current_time = floor(tmp);
1017         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1018     }
1019     /* dont do any correction for old version RRDs */
1020     if (version < 3)
1021         *current_time_usec = 0;
1022
1023     if (*current_time < rrd->live_head->last_up ||
1024         (*current_time == rrd->live_head->last_up &&
1025          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1026         rrd_set_error("illegal attempt to update using time %ld when "
1027                       "last update time is %ld (minimum one second step)",
1028                       *current_time, rrd->live_head->last_up);
1029         return -1;
1030     }
1031     return 0;
1032 }
1033
1034 /*
1035  * Update pdp_new by interpreting the updvals according to the DS type
1036  * (COUNTER, GAUGE, etc.).
1037  *
1038  * Returns 0 on success, -1 on error.
1039  */
1040 static int update_pdp_prep(
1041     rrd_t *rrd,
1042     char **updvals,
1043     rrd_value_t *pdp_new,
1044     double interval)
1045 {
1046     unsigned long ds_idx;
1047     int       ii;
1048     char     *endptr;   /* used in the conversion */
1049     double    rate;
1050     char     *old_locale;
1051     enum dst_en dst_idx;
1052
1053     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1054         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1055
1056         /* make sure we do not build diffs with old last_ds values */
1057         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1058             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1059             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1060         }
1061
1062         /* NOTE: DST_CDEF should never enter this if block, because
1063          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1064          * accidently specified a value for the DST_CDEF. To handle this case,
1065          * an extra check is required. */
1066
1067         if ((updvals[ds_idx + 1][0] != 'U') &&
1068             (dst_idx != DST_CDEF) &&
1069             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1070             rate = DNAN;
1071
1072             /* pdp_new contains rate * time ... eg the bytes transferred during
1073              * the interval. Doing it this way saves a lot of math operations
1074              */
1075             switch (dst_idx) {
1076             case DST_COUNTER:
1077             case DST_DERIVE:
1078                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1079                     if ((updvals[ds_idx + 1][ii] < '0'
1080                          || updvals[ds_idx + 1][ii] > '9')
1081                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1082                         rrd_set_error("not a simple integer: '%s'",
1083                                       updvals[ds_idx + 1]);
1084                         return -1;
1085                     }
1086                 }
1087                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1088                     pdp_new[ds_idx] =
1089                         rrd_diff(updvals[ds_idx + 1],
1090                                  rrd->pdp_prep[ds_idx].last_ds);
1091                     if (dst_idx == DST_COUNTER) {
1092                         /* simple overflow catcher. This will fail
1093                          * terribly for non 32 or 64 bit counters
1094                          * ... are there any others in SNMP land?
1095                          */
1096                         if (pdp_new[ds_idx] < (double) 0.0)
1097                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1098                         if (pdp_new[ds_idx] < (double) 0.0)
1099                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1100                     }
1101                     rate = pdp_new[ds_idx] / interval;
1102                 } else {
1103                     pdp_new[ds_idx] = DNAN;
1104                 }
1105                 break;
1106             case DST_ABSOLUTE:
1107                 old_locale = setlocale(LC_NUMERIC, "C");
1108                 errno = 0;
1109                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1110                 setlocale(LC_NUMERIC, old_locale);
1111                 if (errno > 0) {
1112                     rrd_set_error("converting '%s' to float: %s",
1113                                   updvals[ds_idx + 1], rrd_strerror(errno));
1114                     return -1;
1115                 };
1116                 if (endptr[0] != '\0') {
1117                     rrd_set_error
1118                         ("conversion of '%s' to float not complete: tail '%s'",
1119                          updvals[ds_idx + 1], endptr);
1120                     return -1;
1121                 }
1122                 rate = pdp_new[ds_idx] / interval;
1123                 break;
1124             case DST_GAUGE:
1125                 errno = 0;
1126                 old_locale = setlocale(LC_NUMERIC, "C");
1127                 pdp_new[ds_idx] =
1128                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1129                 setlocale(LC_NUMERIC, old_locale);
1130                 if (errno) {
1131                     rrd_set_error("converting '%s' to float: %s",
1132                                   updvals[ds_idx + 1], rrd_strerror(errno));
1133                     return -1;
1134                 };
1135                 if (endptr[0] != '\0') {
1136                     rrd_set_error
1137                         ("conversion of '%s' to float not complete: tail '%s'",
1138                          updvals[ds_idx + 1], endptr);
1139                     return -1;
1140                 }
1141                 rate = pdp_new[ds_idx] / interval;
1142                 break;
1143             default:
1144                 rrd_set_error("rrd contains unknown DS type : '%s'",
1145                               rrd->ds_def[ds_idx].dst);
1146                 return -1;
1147             }
1148             /* break out of this for loop if the error string is set */
1149             if (rrd_test_error()) {
1150                 return -1;
1151             }
1152             /* make sure pdp_temp is neither too large or too small
1153              * if any of these occur it becomes unknown ...
1154              * sorry folks ... */
1155             if (!isnan(rate) &&
1156                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1157                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1158                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1159                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1160                 pdp_new[ds_idx] = DNAN;
1161             }
1162         } else {
1163             /* no news is news all the same */
1164             pdp_new[ds_idx] = DNAN;
1165         }
1166
1167
1168         /* make a copy of the command line argument for the next run */
1169 #ifdef DEBUG
1170         fprintf(stderr, "prep ds[%lu]\t"
1171                 "last_arg '%s'\t"
1172                 "this_arg '%s'\t"
1173                 "pdp_new %10.2f\n",
1174                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1175                 pdp_new[ds_idx]);
1176 #endif
1177         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1178                 LAST_DS_LEN - 1);
1179         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1180     }
1181     return 0;
1182 }
1183
1184 /*
1185  * How many PDP steps have elapsed since the last update? Returns the answer,
1186  * and stores the time between the last update and the last PDP in pre_time,
1187  * and the time between the last PDP and the current time in post_int.
1188  */
1189 static int calculate_elapsed_steps(
1190     rrd_t *rrd,
1191     unsigned long current_time,
1192     unsigned long current_time_usec,
1193     double interval,
1194     double *pre_int,
1195     double *post_int,
1196     unsigned long *proc_pdp_cnt)
1197 {
1198     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1199     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1200                                  * time */
1201     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1202                                  * when it was last updated */
1203     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1204
1205     /* when was the current pdp started */
1206     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1207     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1208
1209     /* when did the last pdp_st occur */
1210     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1211     occu_pdp_st = current_time - occu_pdp_age;
1212
1213     if (occu_pdp_st > proc_pdp_st) {
1214         /* OK we passed the pdp_st moment */
1215         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1216                                                                      * occurred before the latest
1217                                                                      * pdp_st moment*/
1218         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1219         *post_int = occu_pdp_age;   /* how much after it */
1220         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1221     } else {
1222         *pre_int = interval;
1223         *post_int = 0;
1224     }
1225
1226     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1227
1228 #ifdef DEBUG
1229     printf("proc_pdp_age %lu\t"
1230            "proc_pdp_st %lu\t"
1231            "occu_pfp_age %lu\t"
1232            "occu_pdp_st %lu\t"
1233            "int %lf\t"
1234            "pre_int %lf\t"
1235            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1236            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1237 #endif
1238
1239     /* compute the number of elapsed pdp_st moments */
1240     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1241 }
1242
1243 /*
1244  * Increment the PDP values by the values in pdp_new, or else initialize them.
1245  */
1246 static void simple_update(
1247     rrd_t *rrd,
1248     double interval,
1249     rrd_value_t *pdp_new)
1250 {
1251     int       i;
1252
1253     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1254         if (isnan(pdp_new[i])) {
1255             /* this is not really accurate if we use subsecond data arrival time
1256                should have thought of it when going subsecond resolution ...
1257                sorry next format change we will have it! */
1258             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1259                 floor(interval);
1260         } else {
1261             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1262                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1263             } else {
1264                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1265             }
1266         }
1267 #ifdef DEBUG
1268         fprintf(stderr,
1269                 "NO PDP  ds[%i]\t"
1270                 "value %10.2f\t"
1271                 "unkn_sec %5lu\n",
1272                 i,
1273                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1274                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1275 #endif
1276     }
1277 }
1278
1279 /*
1280  * Call process_pdp_st for each DS.
1281  *
1282  * Returns 0 on success, -1 on error.
1283  */
1284 static int process_all_pdp_st(
1285     rrd_t *rrd,
1286     double interval,
1287     double pre_int,
1288     double post_int,
1289     unsigned long elapsed_pdp_st,
1290     rrd_value_t *pdp_new,
1291     rrd_value_t *pdp_temp)
1292 {
1293     unsigned long ds_idx;
1294
1295     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1296        rate*seconds which occurred up to the last run.
1297        pdp_new[] contains rate*seconds from the latest run.
1298        pdp_temp[] will contain the rate for cdp */
1299
1300     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1301         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1302                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1303                            pdp_new, pdp_temp) == -1) {
1304             return -1;
1305         }
1306 #ifdef DEBUG
1307         fprintf(stderr, "PDP UPD ds[%lu]\t"
1308                 "elapsed_pdp_st %lu\t"
1309                 "pdp_temp %10.2f\t"
1310                 "new_prep %10.2f\t"
1311                 "new_unkn_sec %5lu\n",
1312                 ds_idx,
1313                 elapsed_pdp_st,
1314                 pdp_temp[ds_idx],
1315                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1316                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1317 #endif
1318     }
1319     return 0;
1320 }
1321
1322 /*
1323  * Process an update that occurs after one of the PDP moments.
1324  * Increments the PDP value, sets NAN if time greater than the
1325  * heartbeats have elapsed, processes CDEFs.
1326  *
1327  * Returns 0 on success, -1 on error.
1328  */
1329 static int process_pdp_st(
1330     rrd_t *rrd,
1331     unsigned long ds_idx,
1332     double interval,
1333     double pre_int,
1334     double post_int,
1335     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1336     rrd_value_t *pdp_new,
1337     rrd_value_t *pdp_temp)
1338 {
1339     int       i;
1340
1341     /* update pdp_prep to the current pdp_st. */
1342     double    pre_unknown = 0.0;
1343     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1344     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1345
1346     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1347
1348     rpnstack_init(&rpnstack);
1349
1350
1351     if (isnan(pdp_new[ds_idx])) {
1352         /* a final bit of unknown to be added before calculation
1353            we use a temporary variable for this so that we
1354            don't have to turn integer lines before using the value */
1355         pre_unknown = pre_int;
1356     } else {
1357         if (isnan(scratch[PDP_val].u_val)) {
1358             scratch[PDP_val].u_val = 0;
1359         }
1360         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1361     }
1362
1363     /* if too much of the pdp_prep is unknown we dump it */
1364     /* if the interval is larger thatn mrhb we get NAN */
1365     if ((interval > mrhb) ||
1366         (rrd->stat_head->pdp_step / 2.0 <
1367          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1368         pdp_temp[ds_idx] = DNAN;
1369     } else {
1370         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1371             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1372              pre_unknown);
1373     }
1374
1375     /* process CDEF data sources; remember each CDEF DS can
1376      * only reference other DS with a lower index number */
1377     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1378         rpnp_t   *rpnp;
1379
1380         rpnp =
1381             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1382         /* substitute data values for OP_VARIABLE nodes */
1383         for (i = 0; rpnp[i].op != OP_END; i++) {
1384             if (rpnp[i].op == OP_VARIABLE) {
1385                 rpnp[i].op = OP_NUMBER;
1386                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1387             }
1388         }
1389         /* run the rpn calculator */
1390         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1391             free(rpnp);
1392             rpnstack_free(&rpnstack);
1393             return -1;
1394         }
1395     }
1396
1397     /* make pdp_prep ready for the next run */
1398     if (isnan(pdp_new[ds_idx])) {
1399         /* this is not realy accurate if we use subsecond data arival time
1400            should have thought of it when going subsecond resolution ...
1401            sorry next format change we will have it! */
1402         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1403         scratch[PDP_val].u_val = DNAN;
1404     } else {
1405         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1406         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1407     }
1408     rpnstack_free(&rpnstack);
1409     return 0;
1410 }
1411
1412 /*
1413  * Iterate over all the RRAs for a given DS and:
1414  * 1. Decide whether to schedule a smooth later
1415  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1416  * 3. Update the CDP
1417  *
1418  * Returns 0 on success, -1 on error
1419  */
1420 static int update_all_cdp_prep(
1421     rrd_t *rrd,
1422     unsigned long *rra_step_cnt,
1423     unsigned long rra_begin,
1424     rrd_file_t *rrd_file,
1425     unsigned long elapsed_pdp_st,
1426     unsigned long proc_pdp_cnt,
1427     rrd_value_t **last_seasonal_coef,
1428     rrd_value_t **seasonal_coef,
1429     rrd_value_t *pdp_temp,
1430     unsigned long *rra_current,
1431     unsigned long *skip_update,
1432     int *schedule_smooth)
1433 {
1434     unsigned long rra_idx;
1435
1436     /* index into the CDP scratch array */
1437     enum cf_en current_cf;
1438     unsigned long rra_start;
1439
1440     /* number of rows to be updated in an RRA for a data value. */
1441     unsigned long start_pdp_offset;
1442
1443     rra_start = rra_begin;
1444     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1445         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1446         start_pdp_offset =
1447             rrd->rra_def[rra_idx].pdp_cnt -
1448             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1449         skip_update[rra_idx] = 0;
1450         if (start_pdp_offset <= elapsed_pdp_st) {
1451             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1452                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1453         } else {
1454             rra_step_cnt[rra_idx] = 0;
1455         }
1456
1457         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1458             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1459              * so that they will be correct for the next observed value; note that for
1460              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1461              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1462             if (rra_step_cnt[rra_idx] > 1) {
1463                 skip_update[rra_idx] = 1;
1464                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1465                                 elapsed_pdp_st, last_seasonal_coef);
1466                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1467                                 elapsed_pdp_st + 1, seasonal_coef);
1468             }
1469             /* periodically run a smoother for seasonal effects */
1470             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1471 #ifdef DEBUG
1472                 fprintf(stderr,
1473                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1474                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1475                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1476                         u_cnt);
1477 #endif
1478                 *schedule_smooth = 1;
1479             }
1480             *rra_current = rrd_tell(rrd_file);
1481         }
1482         if (rrd_test_error())
1483             return -1;
1484
1485         if (update_cdp_prep
1486             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1487              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1488              current_cf) == -1) {
1489             return -1;
1490         }
1491         rra_start +=
1492             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1493             sizeof(rrd_value_t);
1494     }
1495     return 0;
1496 }
1497
1498 /* 
1499  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1500  */
1501 static int do_schedule_smooth(
1502     rrd_t *rrd,
1503     unsigned long rra_idx,
1504     unsigned long elapsed_pdp_st)
1505 {
1506     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1507     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1508     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1509     unsigned long seasonal_smooth_idx =
1510         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1511     unsigned long *init_seasonal =
1512         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1513
1514     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1515      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1516      * really an RRA level, not a data source within RRA level parameter, but
1517      * the rra_def is read only for rrd_update (not flushed to disk). */
1518     if (*init_seasonal > BURNIN_CYCLES) {
1519         /* someone has no doubt invented a trick to deal with this wrap around,
1520          * but at least this code is clear. */
1521         if (seasonal_smooth_idx > cur_row) {
1522             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1523              * between PDP and CDP */
1524             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1525         }
1526         /* can't rely on negative numbers because we are working with
1527          * unsigned values */
1528         return (cur_row + elapsed_pdp_st >= row_cnt
1529                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1530     }
1531     /* mark off one of the burn-in cycles */
1532     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1533 }
1534
1535 /*
1536  * For a given RRA, iterate over the data sources and call the appropriate
1537  * consolidation function.
1538  *
1539  * Returns 0 on success, -1 on error.
1540  */
1541 static int update_cdp_prep(
1542     rrd_t *rrd,
1543     unsigned long elapsed_pdp_st,
1544     unsigned long start_pdp_offset,
1545     unsigned long *rra_step_cnt,
1546     int rra_idx,
1547     rrd_value_t *pdp_temp,
1548     rrd_value_t *last_seasonal_coef,
1549     rrd_value_t *seasonal_coef,
1550     int current_cf)
1551 {
1552     unsigned long ds_idx, cdp_idx;
1553
1554     /* update CDP_PREP areas */
1555     /* loop over data soures within each RRA */
1556     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1557
1558         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1559
1560         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1561             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1562                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1563                        elapsed_pdp_st, start_pdp_offset,
1564                        rrd->rra_def[rra_idx].pdp_cnt,
1565                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1566                        rra_idx, ds_idx);
1567         } else {
1568             /* Nothing to consolidate if there's one PDP per CDP. However, if
1569              * we've missed some PDPs, let's update null counters etc. */
1570             if (elapsed_pdp_st > 2) {
1571                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1572                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1573                           current_cf);
1574             }
1575         }
1576
1577         if (rrd_test_error())
1578             return -1;
1579     }                   /* endif data sources loop */
1580     return 0;
1581 }
1582
1583 /*
1584  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1585  * primary value, secondary value, and # of unknowns.
1586  */
1587 static void update_cdp(
1588     unival *scratch,
1589     int current_cf,
1590     rrd_value_t pdp_temp_val,
1591     unsigned long rra_step_cnt,
1592     unsigned long elapsed_pdp_st,
1593     unsigned long start_pdp_offset,
1594     unsigned long pdp_cnt,
1595     rrd_value_t xff,
1596     int i,
1597     int ii)
1598 {
1599     /* shorthand variables */
1600     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1601     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1602     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1603     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1604
1605     if (rra_step_cnt) {
1606         /* If we are in this block, as least 1 CDP value will be written to
1607          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1608          * to be written, then the "fill in" value is the CDP_secondary_val
1609          * entry. */
1610         if (isnan(pdp_temp_val)) {
1611             *cdp_unkn_pdp_cnt += start_pdp_offset;
1612             *cdp_secondary_val = DNAN;
1613         } else {
1614             /* CDP_secondary value is the RRA "fill in" value for intermediary
1615              * CDP data entries. No matter the CF, the value is the same because
1616              * the average, max, min, and last of a list of identical values is
1617              * the same, namely, the value itself. */
1618             *cdp_secondary_val = pdp_temp_val;
1619         }
1620
1621         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1622             *cdp_primary_val = DNAN;
1623             if (current_cf == CF_AVERAGE) {
1624                 *cdp_val =
1625                     initialize_average_carry_over(pdp_temp_val,
1626                                                   elapsed_pdp_st,
1627                                                   start_pdp_offset, pdp_cnt);
1628             } else {
1629                 *cdp_val = pdp_temp_val;
1630             }
1631         } else {
1632             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1633                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1634         }               /* endif meets xff value requirement for a valid value */
1635         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1636          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1637         if (isnan(pdp_temp_val))
1638             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1639         else
1640             *cdp_unkn_pdp_cnt = 0;
1641     } else {            /* rra_step_cnt[i]  == 0 */
1642
1643 #ifdef DEBUG
1644         if (isnan(*cdp_val)) {
1645             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1646                     i, ii);
1647         } else {
1648             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1649                     i, ii, *cdp_val);
1650         }
1651 #endif
1652         if (isnan(pdp_temp_val)) {
1653             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1654         } else {
1655             *cdp_val =
1656                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1657                                   current_cf, i, ii);
1658         }
1659     }
1660 }
1661
1662 /*
1663  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1664  * on the type of consolidation function.
1665  */
1666 static void initialize_cdp_val(
1667     unival *scratch,
1668     int current_cf,
1669     rrd_value_t pdp_temp_val,
1670     unsigned long elapsed_pdp_st,
1671     unsigned long start_pdp_offset,
1672     unsigned long pdp_cnt)
1673 {
1674     rrd_value_t cum_val, cur_val;
1675
1676     switch (current_cf) {
1677     case CF_AVERAGE:
1678         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1679         cur_val = IFDNAN(pdp_temp_val, 0.0);
1680         scratch[CDP_primary_val].u_val =
1681             (cum_val + cur_val * start_pdp_offset) /
1682             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1683         scratch[CDP_val].u_val =
1684             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1685                                           start_pdp_offset, pdp_cnt);
1686         break;
1687     case CF_MAXIMUM:
1688         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1689         cur_val = IFDNAN(pdp_temp_val, -DINF);
1690 #if 0
1691 #ifdef DEBUG
1692         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1693             fprintf(stderr,
1694                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1695                     i, ii);
1696             exit(-1);
1697         }
1698 #endif
1699 #endif
1700         if (cur_val > cum_val)
1701             scratch[CDP_primary_val].u_val = cur_val;
1702         else
1703             scratch[CDP_primary_val].u_val = cum_val;
1704         /* initialize carry over value */
1705         scratch[CDP_val].u_val = pdp_temp_val;
1706         break;
1707     case CF_MINIMUM:
1708         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1709         cur_val = IFDNAN(pdp_temp_val, DINF);
1710 #if 0
1711 #ifdef DEBUG
1712         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1713             fprintf(stderr,
1714                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1715                     ii);
1716             exit(-1);
1717         }
1718 #endif
1719 #endif
1720         if (cur_val < cum_val)
1721             scratch[CDP_primary_val].u_val = cur_val;
1722         else
1723             scratch[CDP_primary_val].u_val = cum_val;
1724         /* initialize carry over value */
1725         scratch[CDP_val].u_val = pdp_temp_val;
1726         break;
1727     case CF_LAST:
1728     default:
1729         scratch[CDP_primary_val].u_val = pdp_temp_val;
1730         /* initialize carry over value */
1731         scratch[CDP_val].u_val = pdp_temp_val;
1732         break;
1733     }
1734 }
1735
1736 /*
1737  * Update the consolidation function for Holt-Winters functions as
1738  * well as other functions that don't actually consolidate multiple
1739  * PDPs.
1740  */
1741 static void reset_cdp(
1742     rrd_t *rrd,
1743     unsigned long elapsed_pdp_st,
1744     rrd_value_t *pdp_temp,
1745     rrd_value_t *last_seasonal_coef,
1746     rrd_value_t *seasonal_coef,
1747     int rra_idx,
1748     int ds_idx,
1749     int cdp_idx,
1750     enum cf_en current_cf)
1751 {
1752     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1753
1754     switch (current_cf) {
1755     case CF_AVERAGE:
1756     default:
1757         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1758         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1759         break;
1760     case CF_SEASONAL:
1761     case CF_DEVSEASONAL:
1762         /* need to update cached seasonal values, so they are consistent
1763          * with the bulk update */
1764         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1765          * CDP_last_deviation are the same. */
1766         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1767         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1768         break;
1769     case CF_HWPREDICT:
1770     case CF_MHWPREDICT:
1771         /* need to update the null_count and last_null_count.
1772          * even do this for non-DNAN pdp_temp because the
1773          * algorithm is not learning from batch updates. */
1774         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1775         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1776         /* fall through */
1777     case CF_DEVPREDICT:
1778         scratch[CDP_primary_val].u_val = DNAN;
1779         scratch[CDP_secondary_val].u_val = DNAN;
1780         break;
1781     case CF_FAILURES:
1782         /* do not count missed bulk values as failures */
1783         scratch[CDP_primary_val].u_val = 0;
1784         scratch[CDP_secondary_val].u_val = 0;
1785         /* need to reset violations buffer.
1786          * could do this more carefully, but for now, just
1787          * assume a bulk update wipes away all violations. */
1788         erase_violations(rrd, cdp_idx, rra_idx);
1789         break;
1790     }
1791 }
1792
1793 static rrd_value_t initialize_average_carry_over(
1794     rrd_value_t pdp_temp_val,
1795     unsigned long elapsed_pdp_st,
1796     unsigned long start_pdp_offset,
1797     unsigned long pdp_cnt)
1798 {
1799     /* initialize carry over value */
1800     if (isnan(pdp_temp_val)) {
1801         return DNAN;
1802     }
1803     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1804 }
1805
1806 /*
1807  * Update or initialize a CDP value based on the consolidation
1808  * function.
1809  *
1810  * Returns the new value.
1811  */
1812 static rrd_value_t calculate_cdp_val(
1813     rrd_value_t cdp_val,
1814     rrd_value_t pdp_temp_val,
1815     unsigned long elapsed_pdp_st,
1816     int current_cf,
1817 #ifdef DEBUG
1818     int i,
1819     int ii
1820 #else
1821     int UNUSED(i),
1822     int UNUSED(ii)
1823 #endif
1824     )
1825 {
1826     if (isnan(cdp_val)) {
1827         if (current_cf == CF_AVERAGE) {
1828             pdp_temp_val *= elapsed_pdp_st;
1829         }
1830 #ifdef DEBUG
1831         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1832                 i, ii, pdp_temp_val);
1833 #endif
1834         return pdp_temp_val;
1835     }
1836     if (current_cf == CF_AVERAGE)
1837         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1838     if (current_cf == CF_MINIMUM)
1839         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1840     if (current_cf == CF_MAXIMUM)
1841         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1842
1843     return pdp_temp_val;
1844 }
1845
1846 /*
1847  * For each RRA, update the seasonal values and then call update_aberrant_CF
1848  * for each data source.
1849  *
1850  * Return 0 on success, -1 on error.
1851  */
1852 static int update_aberrant_cdps(
1853     rrd_t *rrd,
1854     rrd_file_t *rrd_file,
1855     unsigned long rra_begin,
1856     unsigned long *rra_current,
1857     unsigned long elapsed_pdp_st,
1858     rrd_value_t *pdp_temp,
1859     rrd_value_t **seasonal_coef)
1860 {
1861     unsigned long rra_idx, ds_idx, j;
1862
1863     /* number of PDP steps since the last update that
1864      * are assigned to the first CDP to be generated
1865      * since the last update. */
1866     unsigned short scratch_idx;
1867     unsigned long rra_start;
1868     enum cf_en current_cf;
1869
1870     /* this loop is only entered if elapsed_pdp_st < 3 */
1871     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1872          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1873         rra_start = rra_begin;
1874         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1875             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1876                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1877                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1878                     if (scratch_idx == CDP_primary_val) {
1879                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1880                                         elapsed_pdp_st + 1, seasonal_coef);
1881                     } else {
1882                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1883                                         elapsed_pdp_st + 2, seasonal_coef);
1884                     }
1885                     *rra_current = rrd_tell(rrd_file);
1886                 }
1887                 if (rrd_test_error())
1888                     return -1;
1889                 /* loop over data soures within each RRA */
1890                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1891                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1892                                        rra_idx * (rrd->stat_head->ds_cnt) +
1893                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1894                                        *seasonal_coef);
1895                 }
1896             }
1897             rra_start += rrd->rra_def[rra_idx].row_cnt
1898                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1899         }
1900     }
1901     return 0;
1902 }
1903
1904 /* 
1905  * Move sequentially through the file, writing one RRA at a time.  Note this
1906  * architecture divorces the computation of CDP with flushing updated RRA
1907  * entries to disk.
1908  *
1909  * Return 0 on success, -1 on error.
1910  */
1911 static int write_to_rras(
1912     rrd_t *rrd,
1913     rrd_file_t *rrd_file,
1914     unsigned long *rra_step_cnt,
1915     unsigned long rra_begin,
1916     unsigned long *rra_current,
1917     time_t current_time,
1918     unsigned long *skip_update,
1919     rrd_info_t ** pcdp_summary)
1920 {
1921     unsigned long rra_idx;
1922     unsigned long rra_start;
1923     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1924     time_t    rra_time = 0; /* time of update for a RRA */
1925
1926     /* Ready to write to disk */
1927     rra_start = rra_begin;
1928     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1929         /* skip unless there's something to write */
1930         if (rra_step_cnt[rra_idx]) {
1931             /* write the first row */
1932 #ifdef DEBUG
1933             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1934 #endif
1935             rrd->rra_ptr[rra_idx].cur_row++;
1936             if (rrd->rra_ptr[rra_idx].cur_row >=
1937                 rrd->rra_def[rra_idx].row_cnt)
1938                 rrd->rra_ptr[rra_idx].cur_row = 0;  /* wrap around */
1939             /* position on the first row */
1940             rra_pos_tmp = rra_start +
1941                 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1942                 sizeof(rrd_value_t);
1943             if (rra_pos_tmp != *rra_current) {
1944                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1945                     rrd_set_error("seek error in rrd");
1946                     return -1;
1947                 }
1948                 *rra_current = rra_pos_tmp;
1949             }
1950 #ifdef DEBUG
1951             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1952 #endif
1953             if (!skip_update[rra_idx]) {
1954                 if (*pcdp_summary != NULL) {
1955                     rra_time = (current_time - current_time
1956                                 % (rrd->rra_def[rra_idx].pdp_cnt *
1957                                    rrd->stat_head->pdp_step))
1958                         -
1959                         ((rra_step_cnt[rra_idx] -
1960                           1) * rrd->rra_def[rra_idx].pdp_cnt *
1961                          rrd->stat_head->pdp_step);
1962                 }
1963                 if (write_RRA_row
1964                     (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1965                      pcdp_summary, rra_time) == -1)
1966                     return -1;
1967             }
1968
1969             /* write other rows of the bulk update, if any */
1970             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1971                 if (++rrd->rra_ptr[rra_idx].cur_row ==
1972                     rrd->rra_def[rra_idx].row_cnt) {
1973 #ifdef DEBUG
1974                     fprintf(stderr,
1975                             "Wraparound for RRA %s, %lu updates left\n",
1976                             rrd->rra_def[rra_idx].cf_nam,
1977                             rra_step_cnt[rra_idx] - 1);
1978 #endif
1979                     /* wrap */
1980                     rrd->rra_ptr[rra_idx].cur_row = 0;
1981                     /* seek back to beginning of current rra */
1982                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1983                         rrd_set_error("seek error in rrd");
1984                         return -1;
1985                     }
1986 #ifdef DEBUG
1987                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1988                             rrd_file->pos);
1989 #endif
1990                     *rra_current = rra_start;
1991                 }
1992                 if (!skip_update[rra_idx]) {
1993                     if (*pcdp_summary != NULL) {
1994                         rra_time = (current_time - current_time
1995                                     % (rrd->rra_def[rra_idx].pdp_cnt *
1996                                        rrd->stat_head->pdp_step))
1997                             -
1998                             ((rra_step_cnt[rra_idx] -
1999                               2) * rrd->rra_def[rra_idx].pdp_cnt *
2000                              rrd->stat_head->pdp_step);
2001                     }
2002                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
2003                                       CDP_secondary_val, pcdp_summary,
2004                                       rra_time) == -1)
2005                         return -1;
2006                 }
2007             }
2008         }
2009         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
2010             sizeof(rrd_value_t);
2011     }                   /* RRA LOOP */
2012
2013     return 0;
2014 }
2015
2016 /*
2017  * Write out one row of values (one value per DS) to the archive.
2018  *
2019  * Returns 0 on success, -1 on error.
2020  */
2021 static int write_RRA_row(
2022     rrd_file_t *rrd_file,
2023     rrd_t *rrd,
2024     unsigned long rra_idx,
2025     unsigned long *rra_current,
2026     unsigned short CDP_scratch_idx,
2027     rrd_info_t ** pcdp_summary,
2028     time_t rra_time)
2029 {
2030     unsigned long ds_idx, cdp_idx;
2031     rrd_infoval_t iv;
2032
2033     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2034         /* compute the cdp index */
2035         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2036 #ifdef DEBUG
2037         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2038                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2039                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2040 #endif
2041         if (*pcdp_summary != NULL) {
2042             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2043             /* append info to the return hash */
2044             *pcdp_summary = rrd_info_push(*pcdp_summary,
2045                                           sprintf_alloc
2046                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2047                                            rrd->rra_def[rra_idx].cf_nam,
2048                                            rrd->rra_def[rra_idx].pdp_cnt,
2049                                            rrd->ds_def[ds_idx].ds_nam),
2050                                           RD_I_VAL, iv);
2051         }
2052         if (rrd_write(rrd_file,
2053                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2054                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2055             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2056             return -1;
2057         }
2058         *rra_current += sizeof(rrd_value_t);
2059     }
2060     return 0;
2061 }
2062
2063 /*
2064  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2065  *
2066  * Returns 0 on success, -1 otherwise
2067  */
2068 static int smooth_all_rras(
2069     rrd_t *rrd,
2070     rrd_file_t *rrd_file,
2071     unsigned long rra_begin)
2072 {
2073     unsigned long rra_start = rra_begin;
2074     unsigned long rra_idx;
2075
2076     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2077         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2078             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2079 #ifdef DEBUG
2080             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2081 #endif
2082             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2083             if (rrd_test_error())
2084                 return -1;
2085         }
2086         rra_start += rrd->rra_def[rra_idx].row_cnt
2087             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2088     }
2089     return 0;
2090 }
2091
2092 #ifndef HAVE_MMAP
2093 /*
2094  * Flush changes to disk (unless we're using mmap)
2095  *
2096  * Returns 0 on success, -1 otherwise
2097  */
2098 static int write_changes_to_disk(
2099     rrd_t *rrd,
2100     rrd_file_t *rrd_file,
2101     int version)
2102 {
2103     /* we just need to write back the live header portion now */
2104     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2105                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2106                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2107                  SEEK_SET) != 0) {
2108         rrd_set_error("seek rrd for live header writeback");
2109         return -1;
2110     }
2111     if (version >= 3) {
2112         if (rrd_write(rrd_file, rrd->live_head,
2113                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2114             rrd_set_error("rrd_write live_head to rrd");
2115             return -1;
2116         }
2117     } else {
2118         if (rrd_write(rrd_file, rrd->legacy_last_up,
2119                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2120             rrd_set_error("rrd_write live_head to rrd");
2121             return -1;
2122         }
2123     }
2124
2125
2126     if (rrd_write(rrd_file, rrd->pdp_prep,
2127                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2128         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2129         rrd_set_error("rrd_write pdp_prep to rrd");
2130         return -1;
2131     }
2132
2133     if (rrd_write(rrd_file, rrd->cdp_prep,
2134                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2135                   rrd->stat_head->ds_cnt)
2136         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2137                       rrd->stat_head->ds_cnt)) {
2138
2139         rrd_set_error("rrd_write cdp_prep to rrd");
2140         return -1;
2141     }
2142
2143     if (rrd_write(rrd_file, rrd->rra_ptr,
2144                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2145         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2146         rrd_set_error("rrd_write rra_ptr to rrd");
2147         return -1;
2148     }
2149     return 0;
2150 }
2151 #endif