Merge branch 'master' into ff/rrdd
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.1  Copyright by Tobi Oetiker, 1997-2008
4  *                Copyright by Florian Forster, 2008
5  *****************************************************************************
6  * rrd_update.c  RRD Update Function
7  *****************************************************************************
8  * $Id$
9  *****************************************************************************/
10
11 #include "rrd_tool.h"
12
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
18
19 #include <locale.h>
20
21 #include "rrd_hw.h"
22 #include "rrd_rpncalc.h"
23
24 #include "rrd_is_thread_safe.h"
25 #include "unused.h"
26
27 #include "rrd_client.h"
28
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 /*
31  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
32  * replacement.
33  */
34 #include <sys/timeb.h>
35
36 #ifndef __MINGW32__
37 struct timeval {
38     time_t    tv_sec;   /* seconds */
39     long      tv_usec;  /* microseconds */
40 };
41 #endif
42
43 struct __timezone {
44     int       tz_minuteswest;   /* minutes W of Greenwich */
45     int       tz_dsttime;   /* type of dst correction */
46 };
47
48 static int gettimeofday(
49     struct timeval *t,
50     struct __timezone *tz)
51 {
52
53     struct _timeb current_time;
54
55     _ftime(&current_time);
56
57     t->tv_sec = current_time.time;
58     t->tv_usec = current_time.millitm * 1000;
59
60     return 0;
61 }
62
63 #endif
64
65 /* FUNCTION PROTOTYPES */
66
67 int       rrd_update_r(
68     const char *filename,
69     const char *tmplt,
70     int argc,
71     const char **argv);
72 int       _rrd_update(
73     const char *filename,
74     const char *tmplt,
75     int argc,
76     const char **argv,
77     rrd_info_t *);
78
79 static int allocate_data_structures(
80     rrd_t *rrd,
81     char ***updvals,
82     rrd_value_t **pdp_temp,
83     const char *tmplt,
84     long **tmpl_idx,
85     unsigned long *tmpl_cnt,
86     unsigned long **rra_step_cnt,
87     unsigned long **skip_update,
88     rrd_value_t **pdp_new);
89
90 static int parse_template(
91     rrd_t *rrd,
92     const char *tmplt,
93     unsigned long *tmpl_cnt,
94     long *tmpl_idx);
95
96 static int process_arg(
97     char *step_start,
98     rrd_t *rrd,
99     rrd_file_t *rrd_file,
100     unsigned long rra_begin,
101     unsigned long *rra_current,
102     time_t *current_time,
103     unsigned long *current_time_usec,
104     rrd_value_t *pdp_temp,
105     rrd_value_t *pdp_new,
106     unsigned long *rra_step_cnt,
107     char **updvals,
108     long *tmpl_idx,
109     unsigned long tmpl_cnt,
110     rrd_info_t ** pcdp_summary,
111     int version,
112     unsigned long *skip_update,
113     int *schedule_smooth);
114
115 static int parse_ds(
116     rrd_t *rrd,
117     char **updvals,
118     long *tmpl_idx,
119     char *input,
120     unsigned long tmpl_cnt,
121     time_t *current_time,
122     unsigned long *current_time_usec,
123     int version);
124
125 static int get_time_from_reading(
126     rrd_t *rrd,
127     char timesyntax,
128     char **updvals,
129     time_t *current_time,
130     unsigned long *current_time_usec,
131     int version);
132
133 static int update_pdp_prep(
134     rrd_t *rrd,
135     char **updvals,
136     rrd_value_t *pdp_new,
137     double interval);
138
139 static int calculate_elapsed_steps(
140     rrd_t *rrd,
141     unsigned long current_time,
142     unsigned long current_time_usec,
143     double interval,
144     double *pre_int,
145     double *post_int,
146     unsigned long *proc_pdp_cnt);
147
148 static void simple_update(
149     rrd_t *rrd,
150     double interval,
151     rrd_value_t *pdp_new);
152
153 static int process_all_pdp_st(
154     rrd_t *rrd,
155     double interval,
156     double pre_int,
157     double post_int,
158     unsigned long elapsed_pdp_st,
159     rrd_value_t *pdp_new,
160     rrd_value_t *pdp_temp);
161
162 static int process_pdp_st(
163     rrd_t *rrd,
164     unsigned long ds_idx,
165     double interval,
166     double pre_int,
167     double post_int,
168     long diff_pdp_st,
169     rrd_value_t *pdp_new,
170     rrd_value_t *pdp_temp);
171
172 static int update_all_cdp_prep(
173     rrd_t *rrd,
174     unsigned long *rra_step_cnt,
175     unsigned long rra_begin,
176     rrd_file_t *rrd_file,
177     unsigned long elapsed_pdp_st,
178     unsigned long proc_pdp_cnt,
179     rrd_value_t **last_seasonal_coef,
180     rrd_value_t **seasonal_coef,
181     rrd_value_t *pdp_temp,
182     unsigned long *rra_current,
183     unsigned long *skip_update,
184     int *schedule_smooth);
185
186 static int do_schedule_smooth(
187     rrd_t *rrd,
188     unsigned long rra_idx,
189     unsigned long elapsed_pdp_st);
190
191 static int update_cdp_prep(
192     rrd_t *rrd,
193     unsigned long elapsed_pdp_st,
194     unsigned long start_pdp_offset,
195     unsigned long *rra_step_cnt,
196     int rra_idx,
197     rrd_value_t *pdp_temp,
198     rrd_value_t *last_seasonal_coef,
199     rrd_value_t *seasonal_coef,
200     int current_cf);
201
202 static void update_cdp(
203     unival *scratch,
204     int current_cf,
205     rrd_value_t pdp_temp_val,
206     unsigned long rra_step_cnt,
207     unsigned long elapsed_pdp_st,
208     unsigned long start_pdp_offset,
209     unsigned long pdp_cnt,
210     rrd_value_t xff,
211     int i,
212     int ii);
213
214 static void initialize_cdp_val(
215     unival *scratch,
216     int current_cf,
217     rrd_value_t pdp_temp_val,
218     unsigned long elapsed_pdp_st,
219     unsigned long start_pdp_offset,
220     unsigned long pdp_cnt);
221
222 static void reset_cdp(
223     rrd_t *rrd,
224     unsigned long elapsed_pdp_st,
225     rrd_value_t *pdp_temp,
226     rrd_value_t *last_seasonal_coef,
227     rrd_value_t *seasonal_coef,
228     int rra_idx,
229     int ds_idx,
230     int cdp_idx,
231     enum cf_en current_cf);
232
233 static rrd_value_t initialize_average_carry_over(
234     rrd_value_t pdp_temp_val,
235     unsigned long elapsed_pdp_st,
236     unsigned long start_pdp_offset,
237     unsigned long pdp_cnt);
238
239 static rrd_value_t calculate_cdp_val(
240     rrd_value_t cdp_val,
241     rrd_value_t pdp_temp_val,
242     unsigned long elapsed_pdp_st,
243     int current_cf,
244     int i,
245     int ii);
246
247 static int update_aberrant_cdps(
248     rrd_t *rrd,
249     rrd_file_t *rrd_file,
250     unsigned long rra_begin,
251     unsigned long *rra_current,
252     unsigned long elapsed_pdp_st,
253     rrd_value_t *pdp_temp,
254     rrd_value_t **seasonal_coef);
255
256 static int write_to_rras(
257     rrd_t *rrd,
258     rrd_file_t *rrd_file,
259     unsigned long *rra_step_cnt,
260     unsigned long rra_begin,
261     unsigned long *rra_current,
262     time_t current_time,
263     unsigned long *skip_update,
264     rrd_info_t ** pcdp_summary);
265
266 static int write_RRA_row(
267     rrd_file_t *rrd_file,
268     rrd_t *rrd,
269     unsigned long rra_idx,
270     unsigned long *rra_current,
271     unsigned short CDP_scratch_idx,
272     rrd_info_t ** pcdp_summary,
273     time_t rra_time);
274
275 static int smooth_all_rras(
276     rrd_t *rrd,
277     rrd_file_t *rrd_file,
278     unsigned long rra_begin);
279
280 #ifndef HAVE_MMAP
281 static int write_changes_to_disk(
282     rrd_t *rrd,
283     rrd_file_t *rrd_file,
284     int version);
285 #endif
286
287 /*
288  * normalize time as returned by gettimeofday. usec part must
289  * be always >= 0
290  */
291 static inline void normalize_time(
292     struct timeval *t)
293 {
294     if (t->tv_usec < 0) {
295         t->tv_sec--;
296         t->tv_usec += 1e6L;
297     }
298 }
299
300 /*
301  * Sets current_time and current_time_usec based on the current time.
302  * current_time_usec is set to 0 if the version number is 1 or 2.
303  */
304 static inline void initialize_time(
305     time_t *current_time,
306     unsigned long *current_time_usec,
307     int version)
308 {
309     struct timeval tmp_time;    /* used for time conversion */
310
311     gettimeofday(&tmp_time, 0);
312     normalize_time(&tmp_time);
313     *current_time = tmp_time.tv_sec;
314     if (version >= 3) {
315         *current_time_usec = tmp_time.tv_usec;
316     } else {
317         *current_time_usec = 0;
318     }
319 }
320
321 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
322
323 rrd_info_t *rrd_update_v(
324     int argc,
325     char **argv)
326 {
327     char     *tmplt = NULL;
328     rrd_info_t *result = NULL;
329     rrd_infoval_t rc;
330     struct option long_options[] = {
331         {"template", required_argument, 0, 't'},
332         {0, 0, 0, 0}
333     };
334
335     rc.u_int = -1;
336     optind = 0;
337     opterr = 0;         /* initialize getopt */
338
339     while (1) {
340         int       option_index = 0;
341         int       opt;
342
343         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
344
345         if (opt == EOF)
346             break;
347
348         switch (opt) {
349         case 't':
350             tmplt = optarg;
351             break;
352
353         case '?':
354             rrd_set_error("unknown option '%s'", argv[optind - 1]);
355             goto end_tag;
356         }
357     }
358
359     /* need at least 2 arguments: filename, data. */
360     if (argc - optind < 2) {
361         rrd_set_error("Not enough arguments");
362         goto end_tag;
363     }
364     rc.u_int = 0;
365     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
366     rc.u_int = _rrd_update(argv[optind], tmplt,
367                            argc - optind - 1,
368                            (const char **) (argv + optind + 1), result);
369     result->value.u_int = rc.u_int;
370   end_tag:
371     return result;
372 }
373
374 int rrd_update(
375     int argc,
376     char **argv)
377 {
378     struct option long_options[] = {
379         {"template", required_argument, 0, 't'},
380         {"daemon",   required_argument, 0, 'd'},
381         {0, 0, 0, 0}
382     };
383     int       option_index = 0;
384     int       opt;
385     char     *tmplt = NULL;
386     int       rc = -1;
387     char     *opt_daemon = NULL;
388
389     optind = 0;
390     opterr = 0;         /* initialize getopt */
391
392     while (1) {
393         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
394
395         if (opt == EOF)
396             break;
397
398         switch (opt) {
399         case 't':
400             tmplt = strdup(optarg);
401             break;
402
403         case 'd':
404             if (opt_daemon != NULL)
405                 free (opt_daemon);
406             opt_daemon = strdup (optarg);
407             if (opt_daemon == NULL)
408             {
409                 rrd_set_error("strdup failed.");
410                 goto out;
411             }
412             break;
413
414         case '?':
415             rrd_set_error("unknown option '%s'", argv[optind - 1]);
416             goto out;
417         }
418     }
419
420     /* need at least 2 arguments: filename, data. */
421     if (argc - optind < 2) {
422         rrd_set_error("Not enough arguments");
423         goto out;
424     }
425
426     if ((tmplt != NULL) && (opt_daemon != NULL))
427     {
428         rrd_set_error("The caching opt_daemon cannot be used together with "
429                 "templates yet.");
430         goto out;
431     }
432
433     if ((tmplt == NULL) && (opt_daemon == NULL))
434     {
435         char *temp;
436
437         temp = getenv (ENV_RRDCACHED_ADDRESS);
438         if (temp != NULL)
439         {
440             opt_daemon = strdup (temp);
441             if (opt_daemon == NULL)
442             {
443                 rrd_set_error("strdup failed.");
444                 goto out;
445             }
446         }
447     }
448
449     if (opt_daemon != NULL)
450     {
451         int status;
452
453         status = rrdc_connect (opt_daemon);
454         if (status != 0)
455         {
456             rrd_set_error("Unable to connect to opt_daemon: %s",
457                     (status < 0)
458                     ? "Internal error"
459                     : rrd_strerror (status));
460             goto out;
461         }
462
463         status = rrdc_update (/* file = */ argv[optind],
464                 /* values_num = */ argc - optind - 1,
465                 /* values = */ (void *) (argv + optind + 1));
466         if (status != 0)
467         {
468             rrd_set_error("Failed sending the values to the opt_daemon: %s",
469                     (status < 0)
470                     ? "Internal error"
471                     : rrd_strerror (status));
472         }
473         else
474         {
475             rc = 0;
476         }
477
478         rrdc_disconnect ();
479         goto out;
480     } /* if (opt_daemon != NULL) */
481
482     rc = rrd_update_r(argv[optind], tmplt,
483                       argc - optind - 1, (const char **) (argv + optind + 1));
484   out:
485     if (tmplt != NULL)
486     {
487         free(tmplt);
488         tmplt = NULL;
489     }
490     if (opt_daemon != NULL)
491     {
492         free (opt_daemon);
493         opt_daemon = NULL;
494     }
495     return rc;
496 }
497
498 int rrd_update_r(
499     const char *filename,
500     const char *tmplt,
501     int argc,
502     const char **argv)
503 {
504     return _rrd_update(filename, tmplt, argc, argv, NULL);
505 }
506
507 int _rrd_update(
508     const char *filename,
509     const char *tmplt,
510     int argc,
511     const char **argv,
512     rrd_info_t * pcdp_summary)
513 {
514
515     int       arg_i = 2;
516
517     unsigned long rra_begin;    /* byte pointer to the rra
518                                  * area in the rrd file.  this
519                                  * pointer never changes value */
520     unsigned long rra_current;  /* byte pointer to the current write
521                                  * spot in the rrd file. */
522     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
523                              * to the existing entry */
524     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
525                              * to the cdp values */
526
527     long     *tmpl_idx; /* index representing the settings
528                          * transported by the tmplt index */
529     unsigned long tmpl_cnt = 2; /* time and data */
530     rrd_t     rrd;
531     time_t    current_time = 0;
532     unsigned long current_time_usec = 0;    /* microseconds part of current time */
533     char    **updvals;
534     int       schedule_smooth = 0;
535
536     /* number of elapsed PDP steps since last update */
537     unsigned long *rra_step_cnt = NULL;
538
539     int       version;  /* rrd version */
540     rrd_file_t *rrd_file;
541     char     *arg_copy; /* for processing the argv */
542     unsigned long *skip_update; /* RRAs to advance but not write */
543
544     /* need at least 1 arguments: data. */
545     if (argc < 1) {
546         rrd_set_error("Not enough arguments");
547         goto err_out;
548     }
549
550     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
551         goto err_free;
552     }
553     /* We are now at the beginning of the rra's */
554     rra_current = rra_begin = rrd_file->header_len;
555
556     version = atoi(rrd.stat_head->version);
557
558     initialize_time(&current_time, &current_time_usec, version);
559
560     /* get exclusive lock to whole file.
561      * lock gets removed when we close the file.
562      */
563     if (rrd_lock(rrd_file) != 0) {
564         rrd_set_error("could not lock RRD");
565         goto err_close;
566     }
567
568     if (allocate_data_structures(&rrd, &updvals,
569                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
570                                  &rra_step_cnt, &skip_update,
571                                  &pdp_new) == -1) {
572         goto err_close;
573     }
574
575     /* loop through the arguments. */
576     for (arg_i = 0; arg_i < argc; arg_i++) {
577         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
578             rrd_set_error("failed duplication argv entry");
579             break;
580         }
581         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
582                         &current_time, &current_time_usec, pdp_temp, pdp_new,
583                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
584                         &pcdp_summary, version, skip_update,
585                         &schedule_smooth) == -1) {
586             if (rrd_test_error()) { /* Should have error string always here */
587                 char     *save_error;
588
589                 /* Prepend file name to error message */
590                 if ((save_error = strdup(rrd_get_error())) != NULL) {
591                     rrd_set_error("%s: %s", filename, save_error);
592                     free(save_error);
593                 }
594             }
595             free(arg_copy);
596             break;
597         }
598         free(arg_copy);
599     }
600
601     free(rra_step_cnt);
602
603     /* if we got here and if there is an error and if the file has not been
604      * written to, then close things up and return. */
605     if (rrd_test_error()) {
606         goto err_free_structures;
607     }
608 #ifndef HAVE_MMAP
609     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
610         goto err_free_structures;
611     }
612 #endif
613
614     /* calling the smoothing code here guarantees at most one smoothing
615      * operation per rrd_update call. Unfortunately, it is possible with bulk
616      * updates, or a long-delayed update for smoothing to occur off-schedule.
617      * This really isn't critical except during the burn-in cycles. */
618     if (schedule_smooth) {
619         smooth_all_rras(&rrd, rrd_file, rra_begin);
620     }
621
622 /*    rrd_dontneed(rrd_file,&rrd); */
623     rrd_free(&rrd);
624     rrd_close(rrd_file);
625
626     free(pdp_new);
627     free(tmpl_idx);
628     free(pdp_temp);
629     free(skip_update);
630     free(updvals);
631     return 0;
632
633   err_free_structures:
634     free(pdp_new);
635     free(tmpl_idx);
636     free(pdp_temp);
637     free(skip_update);
638     free(updvals);
639   err_close:
640     rrd_close(rrd_file);
641   err_free:
642     rrd_free(&rrd);
643   err_out:
644     return -1;
645 }
646
647 /*
648  * get exclusive lock to whole file.
649  * lock gets removed when we close the file
650  *
651  * returns 0 on success
652  */
653 int rrd_lock(
654     rrd_file_t *file)
655 {
656     int       rcstat;
657
658     {
659 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
660         struct _stat st;
661
662         if (_fstat(file->fd, &st) == 0) {
663             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
664         } else {
665             rcstat = -1;
666         }
667 #else
668         struct flock lock;
669
670         lock.l_type = F_WRLCK;  /* exclusive write lock */
671         lock.l_len = 0; /* whole file */
672         lock.l_start = 0;   /* start of file */
673         lock.l_whence = SEEK_SET;   /* end of file */
674
675         rcstat = fcntl(file->fd, F_SETLK, &lock);
676 #endif
677     }
678
679     return (rcstat);
680 }
681
682 /*
683  * Allocate some important arrays used, and initialize the template.
684  *
685  * When it returns, either all of the structures are allocated
686  * or none of them are.
687  *
688  * Returns 0 on success, -1 on error.
689  */
690 static int allocate_data_structures(
691     rrd_t *rrd,
692     char ***updvals,
693     rrd_value_t **pdp_temp,
694     const char *tmplt,
695     long **tmpl_idx,
696     unsigned long *tmpl_cnt,
697     unsigned long **rra_step_cnt,
698     unsigned long **skip_update,
699     rrd_value_t **pdp_new)
700 {
701     unsigned  i, ii;
702     if ((*updvals = (char **) malloc(sizeof(char *)
703                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
704         rrd_set_error("allocating updvals pointer array.");
705         return -1;
706     }
707     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
708                                             * rrd->stat_head->ds_cnt)) ==
709         NULL) {
710         rrd_set_error("allocating pdp_temp.");
711         goto err_free_updvals;
712     }
713     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
714                                                  *
715                                                  rrd->stat_head->rra_cnt)) ==
716         NULL) {
717         rrd_set_error("allocating skip_update.");
718         goto err_free_pdp_temp;
719     }
720     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
721                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
722         rrd_set_error("allocating tmpl_idx.");
723         goto err_free_skip_update;
724     }
725     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
726                                                   *
727                                                   (rrd->stat_head->
728                                                    rra_cnt))) == NULL) {
729         rrd_set_error("allocating rra_step_cnt.");
730         goto err_free_tmpl_idx;
731     }
732
733     /* initialize tmplt redirector */
734     /* default config example (assume DS 1 is a CDEF DS)
735        tmpl_idx[0] -> 0; (time)
736        tmpl_idx[1] -> 1; (DS 0)
737        tmpl_idx[2] -> 3; (DS 2)
738        tmpl_idx[3] -> 4; (DS 3) */
739     (*tmpl_idx)[0] = 0; /* time */
740     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
741         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
742             (*tmpl_idx)[ii++] = i;
743     }
744     *tmpl_cnt = ii;
745
746     if (tmplt != NULL) {
747         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
748             goto err_free_rra_step_cnt;
749         }
750     }
751
752     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
753                                            * rrd->stat_head->ds_cnt)) == NULL) {
754         rrd_set_error("allocating pdp_new.");
755         goto err_free_rra_step_cnt;
756     }
757
758     return 0;
759
760   err_free_rra_step_cnt:
761     free(*rra_step_cnt);
762   err_free_tmpl_idx:
763     free(*tmpl_idx);
764   err_free_skip_update:
765     free(*skip_update);
766   err_free_pdp_temp:
767     free(*pdp_temp);
768   err_free_updvals:
769     free(*updvals);
770     return -1;
771 }
772
773 /*
774  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
775  *
776  * Returns 0 on success.
777  */
778 static int parse_template(
779     rrd_t *rrd,
780     const char *tmplt,
781     unsigned long *tmpl_cnt,
782     long *tmpl_idx)
783 {
784     char     *dsname, *tmplt_copy;
785     unsigned int tmpl_len, i;
786     int       ret = 0;
787
788     *tmpl_cnt = 1;      /* the first entry is the time */
789
790     /* we should work on a writeable copy here */
791     if ((tmplt_copy = strdup(tmplt)) == NULL) {
792         rrd_set_error("error copying tmplt '%s'", tmplt);
793         ret = -1;
794         goto out;
795     }
796
797     dsname = tmplt_copy;
798     tmpl_len = strlen(tmplt_copy);
799     for (i = 0; i <= tmpl_len; i++) {
800         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
801             tmplt_copy[i] = '\0';
802             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
803                 rrd_set_error("tmplt contains more DS definitions than RRD");
804                 ret = -1;
805                 goto out_free_tmpl_copy;
806             }
807             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
808                 rrd_set_error("unknown DS name '%s'", dsname);
809                 ret = -1;
810                 goto out_free_tmpl_copy;
811             }
812             /* go to the next entry on the tmplt_copy */
813             if (i < tmpl_len)
814                 dsname = &tmplt_copy[i + 1];
815         }
816     }
817   out_free_tmpl_copy:
818     free(tmplt_copy);
819   out:
820     return ret;
821 }
822
823 /*
824  * Parse an update string, updates the primary data points (PDPs)
825  * and consolidated data points (CDPs), and writes changes to the RRAs.
826  *
827  * Returns 0 on success, -1 on error.
828  */
829 static int process_arg(
830     char *step_start,
831     rrd_t *rrd,
832     rrd_file_t *rrd_file,
833     unsigned long rra_begin,
834     unsigned long *rra_current,
835     time_t *current_time,
836     unsigned long *current_time_usec,
837     rrd_value_t *pdp_temp,
838     rrd_value_t *pdp_new,
839     unsigned long *rra_step_cnt,
840     char **updvals,
841     long *tmpl_idx,
842     unsigned long tmpl_cnt,
843     rrd_info_t ** pcdp_summary,
844     int version,
845     unsigned long *skip_update,
846     int *schedule_smooth)
847 {
848     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
849
850     /* a vector of future Holt-Winters seasonal coefs */
851     unsigned long elapsed_pdp_st;
852
853     double    interval, pre_int, post_int;  /* interval between this and
854                                              * the last run */
855     unsigned long proc_pdp_cnt;
856     unsigned long rra_start;
857
858     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
859                  current_time, current_time_usec, version) == -1) {
860         return -1;
861     }
862     /* seek to the beginning of the rra's */
863     if (*rra_current != rra_begin) {
864 #ifndef HAVE_MMAP
865         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
866             rrd_set_error("seek error in rrd");
867             return -1;
868         }
869 #endif
870         *rra_current = rra_begin;
871     }
872     rra_start = rra_begin;
873
874     interval = (double) (*current_time - rrd->live_head->last_up)
875         + (double) ((long) *current_time_usec -
876                     (long) rrd->live_head->last_up_usec) / 1e6f;
877
878     /* process the data sources and update the pdp_prep 
879      * area accordingly */
880     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
881         return -1;
882     }
883
884     elapsed_pdp_st = calculate_elapsed_steps(rrd,
885                                              *current_time,
886                                              *current_time_usec, interval,
887                                              &pre_int, &post_int,
888                                              &proc_pdp_cnt);
889
890     /* has a pdp_st moment occurred since the last run ? */
891     if (elapsed_pdp_st == 0) {
892         /* no we have not passed a pdp_st moment. therefore update is simple */
893         simple_update(rrd, interval, pdp_new);
894     } else {
895         /* an pdp_st has occurred. */
896         if (process_all_pdp_st(rrd, interval,
897                                pre_int, post_int,
898                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
899             return -1;
900         }
901         if (update_all_cdp_prep(rrd, rra_step_cnt,
902                                 rra_begin, rrd_file,
903                                 elapsed_pdp_st,
904                                 proc_pdp_cnt,
905                                 &last_seasonal_coef,
906                                 &seasonal_coef,
907                                 pdp_temp, rra_current,
908                                 skip_update, schedule_smooth) == -1) {
909             goto err_free_coefficients;
910         }
911         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
912                                  elapsed_pdp_st, pdp_temp,
913                                  &seasonal_coef) == -1) {
914             goto err_free_coefficients;
915         }
916         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
917                           rra_current, *current_time, skip_update,
918                           pcdp_summary) == -1) {
919             goto err_free_coefficients;
920         }
921     }                   /* endif a pdp_st has occurred */
922     rrd->live_head->last_up = *current_time;
923     rrd->live_head->last_up_usec = *current_time_usec;
924
925     if (version < 3) {
926         *rrd->legacy_last_up = rrd->live_head->last_up;
927     }
928     free(seasonal_coef);
929     free(last_seasonal_coef);
930     return 0;
931
932   err_free_coefficients:
933     free(seasonal_coef);
934     free(last_seasonal_coef);
935     return -1;
936 }
937
938 /*
939  * Parse a DS string (time + colon-separated values), storing the
940  * results in current_time, current_time_usec, and updvals.
941  *
942  * Returns 0 on success, -1 on error.
943  */
944 static int parse_ds(
945     rrd_t *rrd,
946     char **updvals,
947     long *tmpl_idx,
948     char *input,
949     unsigned long tmpl_cnt,
950     time_t *current_time,
951     unsigned long *current_time_usec,
952     int version)
953 {
954     char     *p;
955     unsigned long i;
956     char      timesyntax;
957
958     updvals[0] = input;
959     /* initialize all ds input to unknown except the first one
960        which has always got to be set */
961     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
962         updvals[i] = "U";
963
964     /* separate all ds elements; first must be examined separately
965        due to alternate time syntax */
966     if ((p = strchr(input, '@')) != NULL) {
967         timesyntax = '@';
968     } else if ((p = strchr(input, ':')) != NULL) {
969         timesyntax = ':';
970     } else {
971         rrd_set_error("expected timestamp not found in data source from %s",
972                       input);
973         return -1;
974     }
975     *p = '\0';
976     i = 1;
977     updvals[tmpl_idx[i++]] = p + 1;
978     while (*(++p)) {
979         if (*p == ':') {
980             *p = '\0';
981             if (i < tmpl_cnt) {
982                 updvals[tmpl_idx[i++]] = p + 1;
983             }
984         }
985     }
986
987     if (i != tmpl_cnt) {
988         rrd_set_error("expected %lu data source readings (got %lu) from %s",
989                       tmpl_cnt - 1, i, input);
990         return -1;
991     }
992
993     if (get_time_from_reading(rrd, timesyntax, updvals,
994                               current_time, current_time_usec,
995                               version) == -1) {
996         return -1;
997     }
998     return 0;
999 }
1000
1001 /*
1002  * Parse the time in a DS string, store it in current_time and 
1003  * current_time_usec and verify that it's later than the last
1004  * update for this DS.
1005  *
1006  * Returns 0 on success, -1 on error.
1007  */
1008 static int get_time_from_reading(
1009     rrd_t *rrd,
1010     char timesyntax,
1011     char **updvals,
1012     time_t *current_time,
1013     unsigned long *current_time_usec,
1014     int version)
1015 {
1016     double    tmp;
1017     char     *parsetime_error = NULL;
1018     char     *old_locale;
1019     rrd_time_value_t ds_tv;
1020     struct timeval tmp_time;    /* used for time conversion */
1021
1022     /* get the time from the reading ... handle N */
1023     if (timesyntax == '@') {    /* at-style */
1024         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
1025             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
1026             return -1;
1027         }
1028         if (ds_tv.type == RELATIVE_TO_END_TIME ||
1029             ds_tv.type == RELATIVE_TO_START_TIME) {
1030             rrd_set_error("specifying time relative to the 'start' "
1031                           "or 'end' makes no sense here: %s", updvals[0]);
1032             return -1;
1033         }
1034         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1035         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1036     } else if (strcmp(updvals[0], "N") == 0) {
1037         gettimeofday(&tmp_time, 0);
1038         normalize_time(&tmp_time);
1039         *current_time = tmp_time.tv_sec;
1040         *current_time_usec = tmp_time.tv_usec;
1041     } else {
1042         old_locale = setlocale(LC_NUMERIC, "C");
1043         tmp = strtod(updvals[0], 0);
1044         setlocale(LC_NUMERIC, old_locale);
1045         *current_time = floor(tmp);
1046         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1047     }
1048     /* dont do any correction for old version RRDs */
1049     if (version < 3)
1050         *current_time_usec = 0;
1051
1052     if (*current_time < rrd->live_head->last_up ||
1053         (*current_time == rrd->live_head->last_up &&
1054          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1055         rrd_set_error("illegal attempt to update using time %ld when "
1056                       "last update time is %ld (minimum one second step)",
1057                       *current_time, rrd->live_head->last_up);
1058         return -1;
1059     }
1060     return 0;
1061 }
1062
1063 /*
1064  * Update pdp_new by interpreting the updvals according to the DS type
1065  * (COUNTER, GAUGE, etc.).
1066  *
1067  * Returns 0 on success, -1 on error.
1068  */
1069 static int update_pdp_prep(
1070     rrd_t *rrd,
1071     char **updvals,
1072     rrd_value_t *pdp_new,
1073     double interval)
1074 {
1075     unsigned long ds_idx;
1076     int       ii;
1077     char     *endptr;   /* used in the conversion */
1078     double    rate;
1079     char     *old_locale;
1080     enum dst_en dst_idx;
1081
1082     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1083         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1084
1085         /* make sure we do not build diffs with old last_ds values */
1086         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1087             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1088             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1089         }
1090
1091         /* NOTE: DST_CDEF should never enter this if block, because
1092          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1093          * accidently specified a value for the DST_CDEF. To handle this case,
1094          * an extra check is required. */
1095
1096         if ((updvals[ds_idx + 1][0] != 'U') &&
1097             (dst_idx != DST_CDEF) &&
1098             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1099             rate = DNAN;
1100
1101             /* pdp_new contains rate * time ... eg the bytes transferred during
1102              * the interval. Doing it this way saves a lot of math operations
1103              */
1104             switch (dst_idx) {
1105             case DST_COUNTER:
1106             case DST_DERIVE:
1107                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1108                     if ((updvals[ds_idx + 1][ii] < '0'
1109                          || updvals[ds_idx + 1][ii] > '9')
1110                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1111                         rrd_set_error("not a simple integer: '%s'",
1112                                       updvals[ds_idx + 1]);
1113                         return -1;
1114                     }
1115                 }
1116                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1117                     pdp_new[ds_idx] =
1118                         rrd_diff(updvals[ds_idx + 1],
1119                                  rrd->pdp_prep[ds_idx].last_ds);
1120                     if (dst_idx == DST_COUNTER) {
1121                         /* simple overflow catcher. This will fail
1122                          * terribly for non 32 or 64 bit counters
1123                          * ... are there any others in SNMP land?
1124                          */
1125                         if (pdp_new[ds_idx] < (double) 0.0)
1126                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1127                         if (pdp_new[ds_idx] < (double) 0.0)
1128                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1129                     }
1130                     rate = pdp_new[ds_idx] / interval;
1131                 } else {
1132                     pdp_new[ds_idx] = DNAN;
1133                 }
1134                 break;
1135             case DST_ABSOLUTE:
1136                 old_locale = setlocale(LC_NUMERIC, "C");
1137                 errno = 0;
1138                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1139                 setlocale(LC_NUMERIC, old_locale);
1140                 if (errno > 0) {
1141                     rrd_set_error("converting '%s' to float: %s",
1142                                   updvals[ds_idx + 1], rrd_strerror(errno));
1143                     return -1;
1144                 };
1145                 if (endptr[0] != '\0') {
1146                     rrd_set_error
1147                         ("conversion of '%s' to float not complete: tail '%s'",
1148                          updvals[ds_idx + 1], endptr);
1149                     return -1;
1150                 }
1151                 rate = pdp_new[ds_idx] / interval;
1152                 break;
1153             case DST_GAUGE:
1154                 errno = 0;
1155                 old_locale = setlocale(LC_NUMERIC, "C");
1156                 pdp_new[ds_idx] =
1157                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1158                 setlocale(LC_NUMERIC, old_locale);
1159                 if (errno) {
1160                     rrd_set_error("converting '%s' to float: %s",
1161                                   updvals[ds_idx + 1], rrd_strerror(errno));
1162                     return -1;
1163                 };
1164                 if (endptr[0] != '\0') {
1165                     rrd_set_error
1166                         ("conversion of '%s' to float not complete: tail '%s'",
1167                          updvals[ds_idx + 1], endptr);
1168                     return -1;
1169                 }
1170                 rate = pdp_new[ds_idx] / interval;
1171                 break;
1172             default:
1173                 rrd_set_error("rrd contains unknown DS type : '%s'",
1174                               rrd->ds_def[ds_idx].dst);
1175                 return -1;
1176             }
1177             /* break out of this for loop if the error string is set */
1178             if (rrd_test_error()) {
1179                 return -1;
1180             }
1181             /* make sure pdp_temp is neither too large or too small
1182              * if any of these occur it becomes unknown ...
1183              * sorry folks ... */
1184             if (!isnan(rate) &&
1185                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1186                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1187                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1188                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1189                 pdp_new[ds_idx] = DNAN;
1190             }
1191         } else {
1192             /* no news is news all the same */
1193             pdp_new[ds_idx] = DNAN;
1194         }
1195
1196
1197         /* make a copy of the command line argument for the next run */
1198 #ifdef DEBUG
1199         fprintf(stderr, "prep ds[%lu]\t"
1200                 "last_arg '%s'\t"
1201                 "this_arg '%s'\t"
1202                 "pdp_new %10.2f\n",
1203                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1204                 pdp_new[ds_idx]);
1205 #endif
1206         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1207                 LAST_DS_LEN - 1);
1208         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1209     }
1210     return 0;
1211 }
1212
1213 /*
1214  * How many PDP steps have elapsed since the last update? Returns the answer,
1215  * and stores the time between the last update and the last PDP in pre_time,
1216  * and the time between the last PDP and the current time in post_int.
1217  */
1218 static int calculate_elapsed_steps(
1219     rrd_t *rrd,
1220     unsigned long current_time,
1221     unsigned long current_time_usec,
1222     double interval,
1223     double *pre_int,
1224     double *post_int,
1225     unsigned long *proc_pdp_cnt)
1226 {
1227     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1228     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1229                                  * time */
1230     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1231                                  * when it was last updated */
1232     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1233
1234     /* when was the current pdp started */
1235     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1236     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1237
1238     /* when did the last pdp_st occur */
1239     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1240     occu_pdp_st = current_time - occu_pdp_age;
1241
1242     if (occu_pdp_st > proc_pdp_st) {
1243         /* OK we passed the pdp_st moment */
1244         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1245                                                                      * occurred before the latest
1246                                                                      * pdp_st moment*/
1247         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1248         *post_int = occu_pdp_age;   /* how much after it */
1249         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1250     } else {
1251         *pre_int = interval;
1252         *post_int = 0;
1253     }
1254
1255     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1256
1257 #ifdef DEBUG
1258     printf("proc_pdp_age %lu\t"
1259            "proc_pdp_st %lu\t"
1260            "occu_pfp_age %lu\t"
1261            "occu_pdp_st %lu\t"
1262            "int %lf\t"
1263            "pre_int %lf\t"
1264            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1265            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1266 #endif
1267
1268     /* compute the number of elapsed pdp_st moments */
1269     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1270 }
1271
1272 /*
1273  * Increment the PDP values by the values in pdp_new, or else initialize them.
1274  */
1275 static void simple_update(
1276     rrd_t *rrd,
1277     double interval,
1278     rrd_value_t *pdp_new)
1279 {
1280     int       i;
1281
1282     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1283         if (isnan(pdp_new[i])) {
1284             /* this is not really accurate if we use subsecond data arrival time
1285                should have thought of it when going subsecond resolution ...
1286                sorry next format change we will have it! */
1287             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1288                 floor(interval);
1289         } else {
1290             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1291                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1292             } else {
1293                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1294             }
1295         }
1296 #ifdef DEBUG
1297         fprintf(stderr,
1298                 "NO PDP  ds[%i]\t"
1299                 "value %10.2f\t"
1300                 "unkn_sec %5lu\n",
1301                 i,
1302                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1303                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1304 #endif
1305     }
1306 }
1307
1308 /*
1309  * Call process_pdp_st for each DS.
1310  *
1311  * Returns 0 on success, -1 on error.
1312  */
1313 static int process_all_pdp_st(
1314     rrd_t *rrd,
1315     double interval,
1316     double pre_int,
1317     double post_int,
1318     unsigned long elapsed_pdp_st,
1319     rrd_value_t *pdp_new,
1320     rrd_value_t *pdp_temp)
1321 {
1322     unsigned long ds_idx;
1323
1324     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1325        rate*seconds which occurred up to the last run.
1326        pdp_new[] contains rate*seconds from the latest run.
1327        pdp_temp[] will contain the rate for cdp */
1328
1329     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1330         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1331                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1332                            pdp_new, pdp_temp) == -1) {
1333             return -1;
1334         }
1335 #ifdef DEBUG
1336         fprintf(stderr, "PDP UPD ds[%lu]\t"
1337                 "elapsed_pdp_st %lu\t"
1338                 "pdp_temp %10.2f\t"
1339                 "new_prep %10.2f\t"
1340                 "new_unkn_sec %5lu\n",
1341                 ds_idx,
1342                 elapsed_pdp_st,
1343                 pdp_temp[ds_idx],
1344                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1345                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1346 #endif
1347     }
1348     return 0;
1349 }
1350
1351 /*
1352  * Process an update that occurs after one of the PDP moments.
1353  * Increments the PDP value, sets NAN if time greater than the
1354  * heartbeats have elapsed, processes CDEFs.
1355  *
1356  * Returns 0 on success, -1 on error.
1357  */
1358 static int process_pdp_st(
1359     rrd_t *rrd,
1360     unsigned long ds_idx,
1361     double interval,
1362     double pre_int,
1363     double post_int,
1364     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1365     rrd_value_t *pdp_new,
1366     rrd_value_t *pdp_temp)
1367 {
1368     int       i;
1369
1370     /* update pdp_prep to the current pdp_st. */
1371     double    pre_unknown = 0.0;
1372     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1373     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1374
1375     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1376
1377     rpnstack_init(&rpnstack);
1378
1379
1380     if (isnan(pdp_new[ds_idx])) {
1381         /* a final bit of unknown to be added before calculation
1382            we use a temporary variable for this so that we
1383            don't have to turn integer lines before using the value */
1384         pre_unknown = pre_int;
1385     } else {
1386         if (isnan(scratch[PDP_val].u_val)) {
1387             scratch[PDP_val].u_val = 0;
1388         }
1389         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1390     }
1391
1392     /* if too much of the pdp_prep is unknown we dump it */
1393     /* if the interval is larger thatn mrhb we get NAN */
1394     if ((interval > mrhb) ||
1395         (rrd->stat_head->pdp_step / 2.0 <
1396          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1397         pdp_temp[ds_idx] = DNAN;
1398     } else {
1399         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1400             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1401              pre_unknown);
1402     }
1403
1404     /* process CDEF data sources; remember each CDEF DS can
1405      * only reference other DS with a lower index number */
1406     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1407         rpnp_t   *rpnp;
1408
1409         rpnp =
1410             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1411         /* substitute data values for OP_VARIABLE nodes */
1412         for (i = 0; rpnp[i].op != OP_END; i++) {
1413             if (rpnp[i].op == OP_VARIABLE) {
1414                 rpnp[i].op = OP_NUMBER;
1415                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1416             }
1417         }
1418         /* run the rpn calculator */
1419         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1420             free(rpnp);
1421             rpnstack_free(&rpnstack);
1422             return -1;
1423         }
1424     }
1425
1426     /* make pdp_prep ready for the next run */
1427     if (isnan(pdp_new[ds_idx])) {
1428         /* this is not realy accurate if we use subsecond data arival time
1429            should have thought of it when going subsecond resolution ...
1430            sorry next format change we will have it! */
1431         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1432         scratch[PDP_val].u_val = DNAN;
1433     } else {
1434         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1435         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1436     }
1437     rpnstack_free(&rpnstack);
1438     return 0;
1439 }
1440
1441 /*
1442  * Iterate over all the RRAs for a given DS and:
1443  * 1. Decide whether to schedule a smooth later
1444  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1445  * 3. Update the CDP
1446  *
1447  * Returns 0 on success, -1 on error
1448  */
1449 static int update_all_cdp_prep(
1450     rrd_t *rrd,
1451     unsigned long *rra_step_cnt,
1452     unsigned long rra_begin,
1453     rrd_file_t *rrd_file,
1454     unsigned long elapsed_pdp_st,
1455     unsigned long proc_pdp_cnt,
1456     rrd_value_t **last_seasonal_coef,
1457     rrd_value_t **seasonal_coef,
1458     rrd_value_t *pdp_temp,
1459     unsigned long *rra_current,
1460     unsigned long *skip_update,
1461     int *schedule_smooth)
1462 {
1463     unsigned long rra_idx;
1464
1465     /* index into the CDP scratch array */
1466     enum cf_en current_cf;
1467     unsigned long rra_start;
1468
1469     /* number of rows to be updated in an RRA for a data value. */
1470     unsigned long start_pdp_offset;
1471
1472     rra_start = rra_begin;
1473     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1474         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1475         start_pdp_offset =
1476             rrd->rra_def[rra_idx].pdp_cnt -
1477             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1478         skip_update[rra_idx] = 0;
1479         if (start_pdp_offset <= elapsed_pdp_st) {
1480             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1481                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1482         } else {
1483             rra_step_cnt[rra_idx] = 0;
1484         }
1485
1486         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1487             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1488              * so that they will be correct for the next observed value; note that for
1489              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1490              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1491             if (rra_step_cnt[rra_idx] > 1) {
1492                 skip_update[rra_idx] = 1;
1493                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1494                                 elapsed_pdp_st, last_seasonal_coef);
1495                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1496                                 elapsed_pdp_st + 1, seasonal_coef);
1497             }
1498             /* periodically run a smoother for seasonal effects */
1499             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1500 #ifdef DEBUG
1501                 fprintf(stderr,
1502                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1503                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1504                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1505                         u_cnt);
1506 #endif
1507                 *schedule_smooth = 1;
1508             }
1509             *rra_current = rrd_tell(rrd_file);
1510         }
1511         if (rrd_test_error())
1512             return -1;
1513
1514         if (update_cdp_prep
1515             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1516              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1517              current_cf) == -1) {
1518             return -1;
1519         }
1520         rra_start +=
1521             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1522             sizeof(rrd_value_t);
1523     }
1524     return 0;
1525 }
1526
1527 /* 
1528  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1529  */
1530 static int do_schedule_smooth(
1531     rrd_t *rrd,
1532     unsigned long rra_idx,
1533     unsigned long elapsed_pdp_st)
1534 {
1535     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1536     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1537     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1538     unsigned long seasonal_smooth_idx =
1539         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1540     unsigned long *init_seasonal =
1541         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1542
1543     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1544      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1545      * really an RRA level, not a data source within RRA level parameter, but
1546      * the rra_def is read only for rrd_update (not flushed to disk). */
1547     if (*init_seasonal > BURNIN_CYCLES) {
1548         /* someone has no doubt invented a trick to deal with this wrap around,
1549          * but at least this code is clear. */
1550         if (seasonal_smooth_idx > cur_row) {
1551             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1552              * between PDP and CDP */
1553             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1554         }
1555         /* can't rely on negative numbers because we are working with
1556          * unsigned values */
1557         return (cur_row + elapsed_pdp_st >= row_cnt
1558                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1559     }
1560     /* mark off one of the burn-in cycles */
1561     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1562 }
1563
1564 /*
1565  * For a given RRA, iterate over the data sources and call the appropriate
1566  * consolidation function.
1567  *
1568  * Returns 0 on success, -1 on error.
1569  */
1570 static int update_cdp_prep(
1571     rrd_t *rrd,
1572     unsigned long elapsed_pdp_st,
1573     unsigned long start_pdp_offset,
1574     unsigned long *rra_step_cnt,
1575     int rra_idx,
1576     rrd_value_t *pdp_temp,
1577     rrd_value_t *last_seasonal_coef,
1578     rrd_value_t *seasonal_coef,
1579     int current_cf)
1580 {
1581     unsigned long ds_idx, cdp_idx;
1582
1583     /* update CDP_PREP areas */
1584     /* loop over data soures within each RRA */
1585     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1586
1587         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1588
1589         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1590             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1591                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1592                        elapsed_pdp_st, start_pdp_offset,
1593                        rrd->rra_def[rra_idx].pdp_cnt,
1594                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1595                        rra_idx, ds_idx);
1596         } else {
1597             /* Nothing to consolidate if there's one PDP per CDP. However, if
1598              * we've missed some PDPs, let's update null counters etc. */
1599             if (elapsed_pdp_st > 2) {
1600                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1601                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1602                           current_cf);
1603             }
1604         }
1605
1606         if (rrd_test_error())
1607             return -1;
1608     }                   /* endif data sources loop */
1609     return 0;
1610 }
1611
1612 /*
1613  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1614  * primary value, secondary value, and # of unknowns.
1615  */
1616 static void update_cdp(
1617     unival *scratch,
1618     int current_cf,
1619     rrd_value_t pdp_temp_val,
1620     unsigned long rra_step_cnt,
1621     unsigned long elapsed_pdp_st,
1622     unsigned long start_pdp_offset,
1623     unsigned long pdp_cnt,
1624     rrd_value_t xff,
1625     int i,
1626     int ii)
1627 {
1628     /* shorthand variables */
1629     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1630     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1631     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1632     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1633
1634     if (rra_step_cnt) {
1635         /* If we are in this block, as least 1 CDP value will be written to
1636          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1637          * to be written, then the "fill in" value is the CDP_secondary_val
1638          * entry. */
1639         if (isnan(pdp_temp_val)) {
1640             *cdp_unkn_pdp_cnt += start_pdp_offset;
1641             *cdp_secondary_val = DNAN;
1642         } else {
1643             /* CDP_secondary value is the RRA "fill in" value for intermediary
1644              * CDP data entries. No matter the CF, the value is the same because
1645              * the average, max, min, and last of a list of identical values is
1646              * the same, namely, the value itself. */
1647             *cdp_secondary_val = pdp_temp_val;
1648         }
1649
1650         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1651             *cdp_primary_val = DNAN;
1652             if (current_cf == CF_AVERAGE) {
1653                 *cdp_val =
1654                     initialize_average_carry_over(pdp_temp_val,
1655                                                   elapsed_pdp_st,
1656                                                   start_pdp_offset, pdp_cnt);
1657             } else {
1658                 *cdp_val = pdp_temp_val;
1659             }
1660         } else {
1661             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1662                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1663         }               /* endif meets xff value requirement for a valid value */
1664         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1665          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1666         if (isnan(pdp_temp_val))
1667             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1668         else
1669             *cdp_unkn_pdp_cnt = 0;
1670     } else {            /* rra_step_cnt[i]  == 0 */
1671
1672 #ifdef DEBUG
1673         if (isnan(*cdp_val)) {
1674             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1675                     i, ii);
1676         } else {
1677             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1678                     i, ii, *cdp_val);
1679         }
1680 #endif
1681         if (isnan(pdp_temp_val)) {
1682             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1683         } else {
1684             *cdp_val =
1685                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1686                                   current_cf, i, ii);
1687         }
1688     }
1689 }
1690
1691 /*
1692  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1693  * on the type of consolidation function.
1694  */
1695 static void initialize_cdp_val(
1696     unival *scratch,
1697     int current_cf,
1698     rrd_value_t pdp_temp_val,
1699     unsigned long elapsed_pdp_st,
1700     unsigned long start_pdp_offset,
1701     unsigned long pdp_cnt)
1702 {
1703     rrd_value_t cum_val, cur_val;
1704
1705     switch (current_cf) {
1706     case CF_AVERAGE:
1707         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1708         cur_val = IFDNAN(pdp_temp_val, 0.0);
1709         scratch[CDP_primary_val].u_val =
1710             (cum_val + cur_val * start_pdp_offset) /
1711             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1712         scratch[CDP_val].u_val =
1713             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1714                                           start_pdp_offset, pdp_cnt);
1715         break;
1716     case CF_MAXIMUM:
1717         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1718         cur_val = IFDNAN(pdp_temp_val, -DINF);
1719 #if 0
1720 #ifdef DEBUG
1721         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1722             fprintf(stderr,
1723                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1724                     i, ii);
1725             exit(-1);
1726         }
1727 #endif
1728 #endif
1729         if (cur_val > cum_val)
1730             scratch[CDP_primary_val].u_val = cur_val;
1731         else
1732             scratch[CDP_primary_val].u_val = cum_val;
1733         /* initialize carry over value */
1734         scratch[CDP_val].u_val = pdp_temp_val;
1735         break;
1736     case CF_MINIMUM:
1737         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1738         cur_val = IFDNAN(pdp_temp_val, DINF);
1739 #if 0
1740 #ifdef DEBUG
1741         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1742             fprintf(stderr,
1743                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1744                     ii);
1745             exit(-1);
1746         }
1747 #endif
1748 #endif
1749         if (cur_val < cum_val)
1750             scratch[CDP_primary_val].u_val = cur_val;
1751         else
1752             scratch[CDP_primary_val].u_val = cum_val;
1753         /* initialize carry over value */
1754         scratch[CDP_val].u_val = pdp_temp_val;
1755         break;
1756     case CF_LAST:
1757     default:
1758         scratch[CDP_primary_val].u_val = pdp_temp_val;
1759         /* initialize carry over value */
1760         scratch[CDP_val].u_val = pdp_temp_val;
1761         break;
1762     }
1763 }
1764
1765 /*
1766  * Update the consolidation function for Holt-Winters functions as
1767  * well as other functions that don't actually consolidate multiple
1768  * PDPs.
1769  */
1770 static void reset_cdp(
1771     rrd_t *rrd,
1772     unsigned long elapsed_pdp_st,
1773     rrd_value_t *pdp_temp,
1774     rrd_value_t *last_seasonal_coef,
1775     rrd_value_t *seasonal_coef,
1776     int rra_idx,
1777     int ds_idx,
1778     int cdp_idx,
1779     enum cf_en current_cf)
1780 {
1781     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1782
1783     switch (current_cf) {
1784     case CF_AVERAGE:
1785     default:
1786         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1787         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1788         break;
1789     case CF_SEASONAL:
1790     case CF_DEVSEASONAL:
1791         /* need to update cached seasonal values, so they are consistent
1792          * with the bulk update */
1793         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1794          * CDP_last_deviation are the same. */
1795         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1796         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1797         break;
1798     case CF_HWPREDICT:
1799     case CF_MHWPREDICT:
1800         /* need to update the null_count and last_null_count.
1801          * even do this for non-DNAN pdp_temp because the
1802          * algorithm is not learning from batch updates. */
1803         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1804         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1805         /* fall through */
1806     case CF_DEVPREDICT:
1807         scratch[CDP_primary_val].u_val = DNAN;
1808         scratch[CDP_secondary_val].u_val = DNAN;
1809         break;
1810     case CF_FAILURES:
1811         /* do not count missed bulk values as failures */
1812         scratch[CDP_primary_val].u_val = 0;
1813         scratch[CDP_secondary_val].u_val = 0;
1814         /* need to reset violations buffer.
1815          * could do this more carefully, but for now, just
1816          * assume a bulk update wipes away all violations. */
1817         erase_violations(rrd, cdp_idx, rra_idx);
1818         break;
1819     }
1820 }
1821
1822 static rrd_value_t initialize_average_carry_over(
1823     rrd_value_t pdp_temp_val,
1824     unsigned long elapsed_pdp_st,
1825     unsigned long start_pdp_offset,
1826     unsigned long pdp_cnt)
1827 {
1828     /* initialize carry over value */
1829     if (isnan(pdp_temp_val)) {
1830         return DNAN;
1831     }
1832     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1833 }
1834
1835 /*
1836  * Update or initialize a CDP value based on the consolidation
1837  * function.
1838  *
1839  * Returns the new value.
1840  */
1841 static rrd_value_t calculate_cdp_val(
1842     rrd_value_t cdp_val,
1843     rrd_value_t pdp_temp_val,
1844     unsigned long elapsed_pdp_st,
1845     int current_cf,
1846 #ifdef DEBUG
1847     int i,
1848     int ii
1849 #else
1850     int UNUSED(i),
1851     int UNUSED(ii)
1852 #endif
1853     )
1854 {
1855     if (isnan(cdp_val)) {
1856         if (current_cf == CF_AVERAGE) {
1857             pdp_temp_val *= elapsed_pdp_st;
1858         }
1859 #ifdef DEBUG
1860         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1861                 i, ii, pdp_temp_val);
1862 #endif
1863         return pdp_temp_val;
1864     }
1865     if (current_cf == CF_AVERAGE)
1866         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1867     if (current_cf == CF_MINIMUM)
1868         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1869     if (current_cf == CF_MAXIMUM)
1870         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1871
1872     return pdp_temp_val;
1873 }
1874
1875 /*
1876  * For each RRA, update the seasonal values and then call update_aberrant_CF
1877  * for each data source.
1878  *
1879  * Return 0 on success, -1 on error.
1880  */
1881 static int update_aberrant_cdps(
1882     rrd_t *rrd,
1883     rrd_file_t *rrd_file,
1884     unsigned long rra_begin,
1885     unsigned long *rra_current,
1886     unsigned long elapsed_pdp_st,
1887     rrd_value_t *pdp_temp,
1888     rrd_value_t **seasonal_coef)
1889 {
1890     unsigned long rra_idx, ds_idx, j;
1891
1892     /* number of PDP steps since the last update that
1893      * are assigned to the first CDP to be generated
1894      * since the last update. */
1895     unsigned short scratch_idx;
1896     unsigned long rra_start;
1897     enum cf_en current_cf;
1898
1899     /* this loop is only entered if elapsed_pdp_st < 3 */
1900     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1901          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1902         rra_start = rra_begin;
1903         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1904             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1905                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1906                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1907                     if (scratch_idx == CDP_primary_val) {
1908                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1909                                         elapsed_pdp_st + 1, seasonal_coef);
1910                     } else {
1911                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1912                                         elapsed_pdp_st + 2, seasonal_coef);
1913                     }
1914                     *rra_current = rrd_tell(rrd_file);
1915                 }
1916                 if (rrd_test_error())
1917                     return -1;
1918                 /* loop over data soures within each RRA */
1919                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1920                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1921                                        rra_idx * (rrd->stat_head->ds_cnt) +
1922                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1923                                        *seasonal_coef);
1924                 }
1925             }
1926             rra_start += rrd->rra_def[rra_idx].row_cnt
1927                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1928         }
1929     }
1930     return 0;
1931 }
1932
1933 /* 
1934  * Move sequentially through the file, writing one RRA at a time.  Note this
1935  * architecture divorces the computation of CDP with flushing updated RRA
1936  * entries to disk.
1937  *
1938  * Return 0 on success, -1 on error.
1939  */
1940 static int write_to_rras(
1941     rrd_t *rrd,
1942     rrd_file_t *rrd_file,
1943     unsigned long *rra_step_cnt,
1944     unsigned long rra_begin,
1945     unsigned long *rra_current,
1946     time_t current_time,
1947     unsigned long *skip_update,
1948     rrd_info_t ** pcdp_summary)
1949 {
1950     unsigned long rra_idx;
1951     unsigned long rra_start;
1952     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1953     time_t    rra_time = 0; /* time of update for a RRA */
1954
1955     /* Ready to write to disk */
1956     rra_start = rra_begin;
1957     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1958         /* skip unless there's something to write */
1959         if (rra_step_cnt[rra_idx]) {
1960             /* write the first row */
1961 #ifdef DEBUG
1962             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1963 #endif
1964             rrd->rra_ptr[rra_idx].cur_row++;
1965             if (rrd->rra_ptr[rra_idx].cur_row >=
1966                 rrd->rra_def[rra_idx].row_cnt)
1967                 rrd->rra_ptr[rra_idx].cur_row = 0;  /* wrap around */
1968             /* position on the first row */
1969             rra_pos_tmp = rra_start +
1970                 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1971                 sizeof(rrd_value_t);
1972             if (rra_pos_tmp != *rra_current) {
1973                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1974                     rrd_set_error("seek error in rrd");
1975                     return -1;
1976                 }
1977                 *rra_current = rra_pos_tmp;
1978             }
1979 #ifdef DEBUG
1980             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1981 #endif
1982             if (!skip_update[rra_idx]) {
1983                 if (*pcdp_summary != NULL) {
1984                     rra_time = (current_time - current_time
1985                                 % (rrd->rra_def[rra_idx].pdp_cnt *
1986                                    rrd->stat_head->pdp_step))
1987                         -
1988                         ((rra_step_cnt[rra_idx] -
1989                           1) * rrd->rra_def[rra_idx].pdp_cnt *
1990                          rrd->stat_head->pdp_step);
1991                 }
1992                 if (write_RRA_row
1993                     (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1994                      pcdp_summary, rra_time) == -1)
1995                     return -1;
1996             }
1997
1998             /* write other rows of the bulk update, if any */
1999             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
2000                 if (++rrd->rra_ptr[rra_idx].cur_row ==
2001                     rrd->rra_def[rra_idx].row_cnt) {
2002 #ifdef DEBUG
2003                     fprintf(stderr,
2004                             "Wraparound for RRA %s, %lu updates left\n",
2005                             rrd->rra_def[rra_idx].cf_nam,
2006                             rra_step_cnt[rra_idx] - 1);
2007 #endif
2008                     /* wrap */
2009                     rrd->rra_ptr[rra_idx].cur_row = 0;
2010                     /* seek back to beginning of current rra */
2011                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
2012                         rrd_set_error("seek error in rrd");
2013                         return -1;
2014                     }
2015 #ifdef DEBUG
2016                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
2017                             rrd_file->pos);
2018 #endif
2019                     *rra_current = rra_start;
2020                 }
2021                 if (!skip_update[rra_idx]) {
2022                     if (*pcdp_summary != NULL) {
2023                         rra_time = (current_time - current_time
2024                                     % (rrd->rra_def[rra_idx].pdp_cnt *
2025                                        rrd->stat_head->pdp_step))
2026                             -
2027                             ((rra_step_cnt[rra_idx] -
2028                               2) * rrd->rra_def[rra_idx].pdp_cnt *
2029                              rrd->stat_head->pdp_step);
2030                     }
2031                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
2032                                       CDP_secondary_val, pcdp_summary,
2033                                       rra_time) == -1)
2034                         return -1;
2035                 }
2036             }
2037         }
2038         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
2039             sizeof(rrd_value_t);
2040     }                   /* RRA LOOP */
2041
2042     return 0;
2043 }
2044
2045 /*
2046  * Write out one row of values (one value per DS) to the archive.
2047  *
2048  * Returns 0 on success, -1 on error.
2049  */
2050 static int write_RRA_row(
2051     rrd_file_t *rrd_file,
2052     rrd_t *rrd,
2053     unsigned long rra_idx,
2054     unsigned long *rra_current,
2055     unsigned short CDP_scratch_idx,
2056     rrd_info_t ** pcdp_summary,
2057     time_t rra_time)
2058 {
2059     unsigned long ds_idx, cdp_idx;
2060     rrd_infoval_t iv;
2061
2062     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2063         /* compute the cdp index */
2064         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2065 #ifdef DEBUG
2066         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2067                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2068                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2069 #endif
2070         if (*pcdp_summary != NULL) {
2071             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2072             /* append info to the return hash */
2073             *pcdp_summary = rrd_info_push(*pcdp_summary,
2074                                           sprintf_alloc
2075                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2076                                            rrd->rra_def[rra_idx].cf_nam,
2077                                            rrd->rra_def[rra_idx].pdp_cnt,
2078                                            rrd->ds_def[ds_idx].ds_nam),
2079                                           RD_I_VAL, iv);
2080         }
2081         if (rrd_write(rrd_file,
2082                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2083                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2084             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2085             return -1;
2086         }
2087         *rra_current += sizeof(rrd_value_t);
2088     }
2089     return 0;
2090 }
2091
2092 /*
2093  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2094  *
2095  * Returns 0 on success, -1 otherwise
2096  */
2097 static int smooth_all_rras(
2098     rrd_t *rrd,
2099     rrd_file_t *rrd_file,
2100     unsigned long rra_begin)
2101 {
2102     unsigned long rra_start = rra_begin;
2103     unsigned long rra_idx;
2104
2105     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2106         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2107             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2108 #ifdef DEBUG
2109             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2110 #endif
2111             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2112             if (rrd_test_error())
2113                 return -1;
2114         }
2115         rra_start += rrd->rra_def[rra_idx].row_cnt
2116             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2117     }
2118     return 0;
2119 }
2120
2121 #ifndef HAVE_MMAP
2122 /*
2123  * Flush changes to disk (unless we're using mmap)
2124  *
2125  * Returns 0 on success, -1 otherwise
2126  */
2127 static int write_changes_to_disk(
2128     rrd_t *rrd,
2129     rrd_file_t *rrd_file,
2130     int version)
2131 {
2132     /* we just need to write back the live header portion now */
2133     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2134                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2135                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2136                  SEEK_SET) != 0) {
2137         rrd_set_error("seek rrd for live header writeback");
2138         return -1;
2139     }
2140     if (version >= 3) {
2141         if (rrd_write(rrd_file, rrd->live_head,
2142                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2143             rrd_set_error("rrd_write live_head to rrd");
2144             return -1;
2145         }
2146     } else {
2147         if (rrd_write(rrd_file, rrd->legacy_last_up,
2148                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2149             rrd_set_error("rrd_write live_head to rrd");
2150             return -1;
2151         }
2152     }
2153
2154
2155     if (rrd_write(rrd_file, rrd->pdp_prep,
2156                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2157         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2158         rrd_set_error("rrd_write pdp_prep to rrd");
2159         return -1;
2160     }
2161
2162     if (rrd_write(rrd_file, rrd->cdp_prep,
2163                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2164                   rrd->stat_head->ds_cnt)
2165         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2166                       rrd->stat_head->ds_cnt)) {
2167
2168         rrd_set_error("rrd_write cdp_prep to rrd");
2169         return -1;
2170     }
2171
2172     if (rrd_write(rrd_file, rrd->rra_ptr,
2173                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2174         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2175         rrd_set_error("rrd_write rra_ptr to rrd");
2176         return -1;
2177     }
2178     return 0;
2179 }
2180 #endif