Merge branch 'master' into ff/rrdd
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.0  Copyright by Tobi Oetiker, 1997-2008
4  *                Copyright by Florian Forster, 2008
5  *****************************************************************************
6  * rrd_update.c  RRD Update Function
7  *****************************************************************************
8  * $Id$
9  *****************************************************************************/
10
11 #include "rrd_tool.h"
12
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
18
19 #include <locale.h>
20
21 #include "rrd_hw.h"
22 #include "rrd_rpncalc.h"
23
24 #include "rrd_is_thread_safe.h"
25 #include "unused.h"
26
27 #include "rrd_client.h"
28
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 /*
31  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
32  * replacement.
33  */
34 #include <sys/timeb.h>
35
36 #ifndef __MINGW32__
37 struct timeval {
38     time_t    tv_sec;   /* seconds */
39     long      tv_usec;  /* microseconds */
40 };
41 #endif
42
43 struct __timezone {
44     int       tz_minuteswest;   /* minutes W of Greenwich */
45     int       tz_dsttime;   /* type of dst correction */
46 };
47
48 static int gettimeofday(
49     struct timeval *t,
50     struct __timezone *tz)
51 {
52
53     struct _timeb current_time;
54
55     _ftime(&current_time);
56
57     t->tv_sec = current_time.time;
58     t->tv_usec = current_time.millitm * 1000;
59
60     return 0;
61 }
62
63 #endif
64
65 /* FUNCTION PROTOTYPES */
66
67 int       rrd_update_r(
68     const char *filename,
69     const char *tmplt,
70     int argc,
71     const char **argv);
72 int       _rrd_update(
73     const char *filename,
74     const char *tmplt,
75     int argc,
76     const char **argv,
77     rrd_info_t *);
78
79 static int allocate_data_structures(
80     rrd_t *rrd,
81     char ***updvals,
82     rrd_value_t **pdp_temp,
83     const char *tmplt,
84     long **tmpl_idx,
85     unsigned long *tmpl_cnt,
86     unsigned long **rra_step_cnt,
87     unsigned long **skip_update,
88     rrd_value_t **pdp_new);
89
90 static int parse_template(
91     rrd_t *rrd,
92     const char *tmplt,
93     unsigned long *tmpl_cnt,
94     long *tmpl_idx);
95
96 static int process_arg(
97     char *step_start,
98     rrd_t *rrd,
99     rrd_file_t *rrd_file,
100     unsigned long rra_begin,
101     unsigned long *rra_current,
102     time_t *current_time,
103     unsigned long *current_time_usec,
104     rrd_value_t *pdp_temp,
105     rrd_value_t *pdp_new,
106     unsigned long *rra_step_cnt,
107     char **updvals,
108     long *tmpl_idx,
109     unsigned long tmpl_cnt,
110     rrd_info_t ** pcdp_summary,
111     int version,
112     unsigned long *skip_update,
113     int *schedule_smooth);
114
115 static int parse_ds(
116     rrd_t *rrd,
117     char **updvals,
118     long *tmpl_idx,
119     char *input,
120     unsigned long tmpl_cnt,
121     time_t *current_time,
122     unsigned long *current_time_usec,
123     int version);
124
125 static int get_time_from_reading(
126     rrd_t *rrd,
127     char timesyntax,
128     char **updvals,
129     time_t *current_time,
130     unsigned long *current_time_usec,
131     int version);
132
133 static int update_pdp_prep(
134     rrd_t *rrd,
135     char **updvals,
136     rrd_value_t *pdp_new,
137     double interval);
138
139 static int calculate_elapsed_steps(
140     rrd_t *rrd,
141     unsigned long current_time,
142     unsigned long current_time_usec,
143     double interval,
144     double *pre_int,
145     double *post_int,
146     unsigned long *proc_pdp_cnt);
147
148 static void simple_update(
149     rrd_t *rrd,
150     double interval,
151     rrd_value_t *pdp_new);
152
153 static int process_all_pdp_st(
154     rrd_t *rrd,
155     double interval,
156     double pre_int,
157     double post_int,
158     unsigned long elapsed_pdp_st,
159     rrd_value_t *pdp_new,
160     rrd_value_t *pdp_temp);
161
162 static int process_pdp_st(
163     rrd_t *rrd,
164     unsigned long ds_idx,
165     double interval,
166     double pre_int,
167     double post_int,
168     long diff_pdp_st,
169     rrd_value_t *pdp_new,
170     rrd_value_t *pdp_temp);
171
172 static int update_all_cdp_prep(
173     rrd_t *rrd,
174     unsigned long *rra_step_cnt,
175     unsigned long rra_begin,
176     rrd_file_t *rrd_file,
177     unsigned long elapsed_pdp_st,
178     unsigned long proc_pdp_cnt,
179     rrd_value_t **last_seasonal_coef,
180     rrd_value_t **seasonal_coef,
181     rrd_value_t *pdp_temp,
182     unsigned long *rra_current,
183     unsigned long *skip_update,
184     int *schedule_smooth);
185
186 static int do_schedule_smooth(
187     rrd_t *rrd,
188     unsigned long rra_idx,
189     unsigned long elapsed_pdp_st);
190
191 static int update_cdp_prep(
192     rrd_t *rrd,
193     unsigned long elapsed_pdp_st,
194     unsigned long start_pdp_offset,
195     unsigned long *rra_step_cnt,
196     int rra_idx,
197     rrd_value_t *pdp_temp,
198     rrd_value_t *last_seasonal_coef,
199     rrd_value_t *seasonal_coef,
200     int current_cf);
201
202 static void update_cdp(
203     unival *scratch,
204     int current_cf,
205     rrd_value_t pdp_temp_val,
206     unsigned long rra_step_cnt,
207     unsigned long elapsed_pdp_st,
208     unsigned long start_pdp_offset,
209     unsigned long pdp_cnt,
210     rrd_value_t xff,
211     int i,
212     int ii);
213
214 static void initialize_cdp_val(
215     unival *scratch,
216     int current_cf,
217     rrd_value_t pdp_temp_val,
218     unsigned long elapsed_pdp_st,
219     unsigned long start_pdp_offset,
220     unsigned long pdp_cnt);
221
222 static void reset_cdp(
223     rrd_t *rrd,
224     unsigned long elapsed_pdp_st,
225     rrd_value_t *pdp_temp,
226     rrd_value_t *last_seasonal_coef,
227     rrd_value_t *seasonal_coef,
228     int rra_idx,
229     int ds_idx,
230     int cdp_idx,
231     enum cf_en current_cf);
232
233 static rrd_value_t initialize_average_carry_over(
234     rrd_value_t pdp_temp_val,
235     unsigned long elapsed_pdp_st,
236     unsigned long start_pdp_offset,
237     unsigned long pdp_cnt);
238
239 static rrd_value_t calculate_cdp_val(
240     rrd_value_t cdp_val,
241     rrd_value_t pdp_temp_val,
242     unsigned long elapsed_pdp_st,
243     int current_cf,
244     int i,
245     int ii);
246
247 static int update_aberrant_cdps(
248     rrd_t *rrd,
249     rrd_file_t *rrd_file,
250     unsigned long rra_begin,
251     unsigned long *rra_current,
252     unsigned long elapsed_pdp_st,
253     rrd_value_t *pdp_temp,
254     rrd_value_t **seasonal_coef);
255
256 static int write_to_rras(
257     rrd_t *rrd,
258     rrd_file_t *rrd_file,
259     unsigned long *rra_step_cnt,
260     unsigned long rra_begin,
261     unsigned long *rra_current,
262     time_t current_time,
263     unsigned long *skip_update,
264     rrd_info_t ** pcdp_summary);
265
266 static int write_RRA_row(
267     rrd_file_t *rrd_file,
268     rrd_t *rrd,
269     unsigned long rra_idx,
270     unsigned long *rra_current,
271     unsigned short CDP_scratch_idx,
272     rrd_info_t ** pcdp_summary,
273     time_t rra_time);
274
275 static int smooth_all_rras(
276     rrd_t *rrd,
277     rrd_file_t *rrd_file,
278     unsigned long rra_begin);
279
280 #ifndef HAVE_MMAP
281 static int write_changes_to_disk(
282     rrd_t *rrd,
283     rrd_file_t *rrd_file,
284     int version);
285 #endif
286
287 /*
288  * normalize time as returned by gettimeofday. usec part must
289  * be always >= 0
290  */
291 static inline void normalize_time(
292     struct timeval *t)
293 {
294     if (t->tv_usec < 0) {
295         t->tv_sec--;
296         t->tv_usec += 1e6L;
297     }
298 }
299
300 /*
301  * Sets current_time and current_time_usec based on the current time.
302  * current_time_usec is set to 0 if the version number is 1 or 2.
303  */
304 static inline void initialize_time(
305     time_t *current_time,
306     unsigned long *current_time_usec,
307     int version)
308 {
309     struct timeval tmp_time;    /* used for time conversion */
310
311     gettimeofday(&tmp_time, 0);
312     normalize_time(&tmp_time);
313     *current_time = tmp_time.tv_sec;
314     if (version >= 3) {
315         *current_time_usec = tmp_time.tv_usec;
316     } else {
317         *current_time_usec = 0;
318     }
319 }
320
321 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
322
323 rrd_info_t *rrd_update_v(
324     int argc,
325     char **argv)
326 {
327     char     *tmplt = NULL;
328     rrd_info_t *result = NULL;
329     rrd_infoval_t rc;
330     struct option long_options[] = {
331         {"template", required_argument, 0, 't'},
332         {0, 0, 0, 0}
333     };
334
335     rc.u_int = -1;
336     optind = 0;
337     opterr = 0;         /* initialize getopt */
338
339     while (1) {
340         int       option_index = 0;
341         int       opt;
342
343         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
344
345         if (opt == EOF)
346             break;
347
348         switch (opt) {
349         case 't':
350             tmplt = optarg;
351             break;
352
353         case '?':
354             rrd_set_error("unknown option '%s'", argv[optind - 1]);
355             goto end_tag;
356         }
357     }
358
359     /* need at least 2 arguments: filename, data. */
360     if (argc - optind < 2) {
361         rrd_set_error("Not enough arguments");
362         goto end_tag;
363     }
364     rc.u_int = 0;
365     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
366     rc.u_int = _rrd_update(argv[optind], tmplt,
367                            argc - optind - 1,
368                            (const char **) (argv + optind + 1), result);
369     result->value.u_int = rc.u_int;
370   end_tag:
371     return result;
372 }
373
374 int rrd_update(
375     int argc,
376     char **argv)
377 {
378     struct option long_options[] = {
379         {"template", required_argument, 0, 't'},
380         {"daemon",   required_argument, 0, 'd'},
381         {0, 0, 0, 0}
382     };
383     int       option_index = 0;
384     int       opt;
385     char     *tmplt = NULL;
386     int       rc = -1;
387     char     *daemon = NULL;
388
389     optind = 0;
390     opterr = 0;         /* initialize getopt */
391
392     while (1) {
393         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
394
395         if (opt == EOF)
396             break;
397
398         switch (opt) {
399         case 't':
400             tmplt = strdup(optarg);
401             break;
402
403         case 'd':
404             if (daemon != NULL)
405                 free (daemon);
406             daemon = strdup (optarg);
407             if (daemon == NULL)
408             {
409                 rrd_set_error("strdup failed.");
410                 goto out;
411             }
412             break;
413
414         case '?':
415             rrd_set_error("unknown option '%s'", argv[optind - 1]);
416             goto out;
417         }
418     }
419
420     /* need at least 2 arguments: filename, data. */
421     if (argc - optind < 2) {
422         rrd_set_error("Not enough arguments");
423         goto out;
424     }
425
426     if ((tmplt != NULL) && (daemon != NULL))
427     {
428         rrd_set_error("The caching daemon cannot be used together with "
429                 "templates yet.");
430         goto out;
431     }
432
433     if ((tmplt == NULL) && (daemon == NULL))
434     {
435         char *temp;
436
437         temp = getenv (ENV_RRDCACHED_ADDRESS);
438         if (temp != NULL)
439         {
440             daemon = strdup (temp);
441             if (daemon == NULL)
442             {
443                 rrd_set_error("strdup failed.");
444                 goto out;
445             }
446         }
447     }
448
449     if (daemon != NULL)
450     {
451         int status;
452
453         status = rrdc_connect (daemon);
454         if (status != 0)
455         {
456             rrd_set_error("Unable to connect to daemon: %s",
457                     (status < 0)
458                     ? "Internal error"
459                     : rrd_strerror (status));
460             goto out;
461         }
462
463         status = rrdc_update (/* file = */ argv[optind],
464                 /* values_num = */ argc - optind - 1,
465                 /* values = */ (void *) (argv + optind + 1));
466         if (status != 0)
467         {
468             rrd_set_error("Failed sending the values to the daemon: %s",
469                     (status < 0)
470                     ? "Internal error"
471                     : rrd_strerror (status));
472         }
473
474         rrdc_disconnect ();
475         goto out;
476     } /* if (daemon != NULL) */
477
478     rc = rrd_update_r(argv[optind], tmplt,
479                       argc - optind - 1, (const char **) (argv + optind + 1));
480   out:
481     if (tmplt != NULL)
482     {
483         free(tmplt);
484         tmplt = NULL;
485     }
486     if (daemon != NULL)
487     {
488         free (daemon);
489         daemon = NULL;
490     }
491     return rc;
492 }
493
494 int rrd_update_r(
495     const char *filename,
496     const char *tmplt,
497     int argc,
498     const char **argv)
499 {
500     return _rrd_update(filename, tmplt, argc, argv, NULL);
501 }
502
503 int _rrd_update(
504     const char *filename,
505     const char *tmplt,
506     int argc,
507     const char **argv,
508     rrd_info_t * pcdp_summary)
509 {
510
511     int       arg_i = 2;
512
513     unsigned long rra_begin;    /* byte pointer to the rra
514                                  * area in the rrd file.  this
515                                  * pointer never changes value */
516     unsigned long rra_current;  /* byte pointer to the current write
517                                  * spot in the rrd file. */
518     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
519                              * to the existing entry */
520     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
521                              * to the cdp values */
522
523     long     *tmpl_idx; /* index representing the settings
524                          * transported by the tmplt index */
525     unsigned long tmpl_cnt = 2; /* time and data */
526     rrd_t     rrd;
527     time_t    current_time = 0;
528     unsigned long current_time_usec = 0;    /* microseconds part of current time */
529     char    **updvals;
530     int       schedule_smooth = 0;
531
532     /* number of elapsed PDP steps since last update */
533     unsigned long *rra_step_cnt = NULL;
534
535     int       version;  /* rrd version */
536     rrd_file_t *rrd_file;
537     char     *arg_copy; /* for processing the argv */
538     unsigned long *skip_update; /* RRAs to advance but not write */
539
540     /* need at least 1 arguments: data. */
541     if (argc < 1) {
542         rrd_set_error("Not enough arguments");
543         goto err_out;
544     }
545
546     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
547         goto err_free;
548     }
549     /* We are now at the beginning of the rra's */
550     rra_current = rra_begin = rrd_file->header_len;
551
552     version = atoi(rrd.stat_head->version);
553
554     initialize_time(&current_time, &current_time_usec, version);
555
556     /* get exclusive lock to whole file.
557      * lock gets removed when we close the file.
558      */
559     if (rrd_lock(rrd_file) != 0) {
560         rrd_set_error("could not lock RRD");
561         goto err_close;
562     }
563
564     if (allocate_data_structures(&rrd, &updvals,
565                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
566                                  &rra_step_cnt, &skip_update,
567                                  &pdp_new) == -1) {
568         goto err_close;
569     }
570
571     /* loop through the arguments. */
572     for (arg_i = 0; arg_i < argc; arg_i++) {
573         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
574             rrd_set_error("failed duplication argv entry");
575             break;
576         }
577         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
578                         &current_time, &current_time_usec, pdp_temp, pdp_new,
579                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
580                         &pcdp_summary, version, skip_update,
581                         &schedule_smooth) == -1) {
582             if (rrd_test_error()) { /* Should have error string always here */
583                 char *save_error;
584
585                 /* Prepend file name to error message */
586                 if ((save_error = strdup(rrd_get_error())) != NULL) {
587                     rrd_set_error("%s: %s", filename, save_error);
588                     free(save_error);
589                 }
590             }
591             free(arg_copy);
592             break;
593         }
594         free(arg_copy);
595     }
596
597     free(rra_step_cnt);
598
599     /* if we got here and if there is an error and if the file has not been
600      * written to, then close things up and return. */
601     if (rrd_test_error()) {
602         goto err_free_structures;
603     }
604 #ifndef HAVE_MMAP
605     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
606         goto err_free_structures;
607     }
608 #endif
609
610     /* calling the smoothing code here guarantees at most one smoothing
611      * operation per rrd_update call. Unfortunately, it is possible with bulk
612      * updates, or a long-delayed update for smoothing to occur off-schedule.
613      * This really isn't critical except during the burn-in cycles. */
614     if (schedule_smooth) {
615         smooth_all_rras(&rrd, rrd_file, rra_begin);
616     }
617
618 /*    rrd_dontneed(rrd_file,&rrd); */
619     rrd_free(&rrd);
620     rrd_close(rrd_file);
621
622     free(pdp_new);
623     free(tmpl_idx);
624     free(pdp_temp);
625     free(skip_update);
626     free(updvals);
627     return 0;
628
629   err_free_structures:
630     free(pdp_new);
631     free(tmpl_idx);
632     free(pdp_temp);
633     free(skip_update);
634     free(updvals);
635   err_close:
636     rrd_close(rrd_file);
637   err_free:
638     rrd_free(&rrd);
639   err_out:
640     return -1;
641 }
642
643 /*
644  * get exclusive lock to whole file.
645  * lock gets removed when we close the file
646  *
647  * returns 0 on success
648  */
649 int rrd_lock(
650     rrd_file_t *file)
651 {
652     int       rcstat;
653
654     {
655 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
656         struct _stat st;
657
658         if (_fstat(file->fd, &st) == 0) {
659             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
660         } else {
661             rcstat = -1;
662         }
663 #else
664         struct flock lock;
665
666         lock.l_type = F_WRLCK;  /* exclusive write lock */
667         lock.l_len = 0; /* whole file */
668         lock.l_start = 0;   /* start of file */
669         lock.l_whence = SEEK_SET;   /* end of file */
670
671         rcstat = fcntl(file->fd, F_SETLK, &lock);
672 #endif
673     }
674
675     return (rcstat);
676 }
677
678 /*
679  * Allocate some important arrays used, and initialize the template.
680  *
681  * When it returns, either all of the structures are allocated
682  * or none of them are.
683  *
684  * Returns 0 on success, -1 on error.
685  */
686 static int allocate_data_structures(
687     rrd_t *rrd,
688     char ***updvals,
689     rrd_value_t **pdp_temp,
690     const char *tmplt,
691     long **tmpl_idx,
692     unsigned long *tmpl_cnt,
693     unsigned long **rra_step_cnt,
694     unsigned long **skip_update,
695     rrd_value_t **pdp_new)
696 {
697     unsigned  i, ii;
698     if ((*updvals = (char **) malloc(sizeof(char *)
699                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
700         rrd_set_error("allocating updvals pointer array.");
701         return -1;
702     }
703     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
704                                             * rrd->stat_head->ds_cnt)) ==
705         NULL) {
706         rrd_set_error("allocating pdp_temp.");
707         goto err_free_updvals;
708     }
709     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
710                                                  *
711                                                  rrd->stat_head->rra_cnt)) ==
712         NULL) {
713         rrd_set_error("allocating skip_update.");
714         goto err_free_pdp_temp;
715     }
716     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
717                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
718         rrd_set_error("allocating tmpl_idx.");
719         goto err_free_skip_update;
720     }
721     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
722                                                   *
723                                                   (rrd->stat_head->
724                                                    rra_cnt))) == NULL) {
725         rrd_set_error("allocating rra_step_cnt.");
726         goto err_free_tmpl_idx;
727     }
728
729     /* initialize tmplt redirector */
730     /* default config example (assume DS 1 is a CDEF DS)
731        tmpl_idx[0] -> 0; (time)
732        tmpl_idx[1] -> 1; (DS 0)
733        tmpl_idx[2] -> 3; (DS 2)
734        tmpl_idx[3] -> 4; (DS 3) */
735     (*tmpl_idx)[0] = 0; /* time */
736     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
737         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
738             (*tmpl_idx)[ii++] = i;
739     }
740     *tmpl_cnt = ii;
741
742     if (tmplt != NULL) {
743         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
744             goto err_free_rra_step_cnt;
745         }
746     }
747
748     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
749                                            * rrd->stat_head->ds_cnt)) == NULL) {
750         rrd_set_error("allocating pdp_new.");
751         goto err_free_rra_step_cnt;
752     }
753
754     return 0;
755
756   err_free_rra_step_cnt:
757     free(*rra_step_cnt);
758   err_free_tmpl_idx:
759     free(*tmpl_idx);
760   err_free_skip_update:
761     free(*skip_update);
762   err_free_pdp_temp:
763     free(*pdp_temp);
764   err_free_updvals:
765     free(*updvals);
766     return -1;
767 }
768
769 /*
770  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
771  *
772  * Returns 0 on success.
773  */
774 static int parse_template(
775     rrd_t *rrd,
776     const char *tmplt,
777     unsigned long *tmpl_cnt,
778     long *tmpl_idx)
779 {
780     char     *dsname, *tmplt_copy;
781     unsigned int tmpl_len, i;
782     int       ret = 0;
783
784     *tmpl_cnt = 1;      /* the first entry is the time */
785
786     /* we should work on a writeable copy here */
787     if ((tmplt_copy = strdup(tmplt)) == NULL) {
788         rrd_set_error("error copying tmplt '%s'", tmplt);
789         ret = -1;
790         goto out;
791     }
792
793     dsname = tmplt_copy;
794     tmpl_len = strlen(tmplt_copy);
795     for (i = 0; i <= tmpl_len; i++) {
796         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
797             tmplt_copy[i] = '\0';
798             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
799                 rrd_set_error("tmplt contains more DS definitions than RRD");
800                 ret = -1;
801                 goto out_free_tmpl_copy;
802             }
803             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
804                 rrd_set_error("unknown DS name '%s'", dsname);
805                 ret = -1;
806                 goto out_free_tmpl_copy;
807             }
808             /* go to the next entry on the tmplt_copy */
809             if (i < tmpl_len)
810                 dsname = &tmplt_copy[i + 1];
811         }
812     }
813   out_free_tmpl_copy:
814     free(tmplt_copy);
815   out:
816     return ret;
817 }
818
819 /*
820  * Parse an update string, updates the primary data points (PDPs)
821  * and consolidated data points (CDPs), and writes changes to the RRAs.
822  *
823  * Returns 0 on success, -1 on error.
824  */
825 static int process_arg(
826     char *step_start,
827     rrd_t *rrd,
828     rrd_file_t *rrd_file,
829     unsigned long rra_begin,
830     unsigned long *rra_current,
831     time_t *current_time,
832     unsigned long *current_time_usec,
833     rrd_value_t *pdp_temp,
834     rrd_value_t *pdp_new,
835     unsigned long *rra_step_cnt,
836     char **updvals,
837     long *tmpl_idx,
838     unsigned long tmpl_cnt,
839     rrd_info_t ** pcdp_summary,
840     int version,
841     unsigned long *skip_update,
842     int *schedule_smooth)
843 {
844     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
845
846     /* a vector of future Holt-Winters seasonal coefs */
847     unsigned long elapsed_pdp_st;
848
849     double    interval, pre_int, post_int;  /* interval between this and
850                                              * the last run */
851     unsigned long proc_pdp_cnt;
852     unsigned long rra_start;
853
854     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
855                  current_time, current_time_usec, version) == -1) {
856         return -1;
857     }
858     /* seek to the beginning of the rra's */
859     if (*rra_current != rra_begin) {
860 #ifndef HAVE_MMAP
861         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
862             rrd_set_error("seek error in rrd");
863             return -1;
864         }
865 #endif
866         *rra_current = rra_begin;
867     }
868     rra_start = rra_begin;
869
870     interval = (double) (*current_time - rrd->live_head->last_up)
871         + (double) ((long) *current_time_usec -
872                     (long) rrd->live_head->last_up_usec) / 1e6f;
873
874     /* process the data sources and update the pdp_prep 
875      * area accordingly */
876     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
877         return -1;
878     }
879
880     elapsed_pdp_st = calculate_elapsed_steps(rrd,
881                                              *current_time,
882                                              *current_time_usec, interval,
883                                              &pre_int, &post_int,
884                                              &proc_pdp_cnt);
885
886     /* has a pdp_st moment occurred since the last run ? */
887     if (elapsed_pdp_st == 0) {
888         /* no we have not passed a pdp_st moment. therefore update is simple */
889         simple_update(rrd, interval, pdp_new);
890     } else {
891         /* an pdp_st has occurred. */
892         if (process_all_pdp_st(rrd, interval,
893                                pre_int, post_int,
894                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
895             return -1;
896         }
897         if (update_all_cdp_prep(rrd, rra_step_cnt,
898                                 rra_begin, rrd_file,
899                                 elapsed_pdp_st,
900                                 proc_pdp_cnt,
901                                 &last_seasonal_coef,
902                                 &seasonal_coef,
903                                 pdp_temp, rra_current,
904                                 skip_update, schedule_smooth) == -1) {
905             goto err_free_coefficients;
906         }
907         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
908                                  elapsed_pdp_st, pdp_temp,
909                                  &seasonal_coef) == -1) {
910             goto err_free_coefficients;
911         }
912         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
913                           rra_current, *current_time, skip_update,
914                           pcdp_summary) == -1) {
915             goto err_free_coefficients;
916         }
917     }                   /* endif a pdp_st has occurred */
918     rrd->live_head->last_up = *current_time;
919     rrd->live_head->last_up_usec = *current_time_usec;
920
921     if (version < 3) {
922         *rrd->legacy_last_up = rrd->live_head->last_up;
923     }
924     free(seasonal_coef);
925     free(last_seasonal_coef);
926     return 0;
927
928   err_free_coefficients:
929     free(seasonal_coef);
930     free(last_seasonal_coef);
931     return -1;
932 }
933
934 /*
935  * Parse a DS string (time + colon-separated values), storing the
936  * results in current_time, current_time_usec, and updvals.
937  *
938  * Returns 0 on success, -1 on error.
939  */
940 static int parse_ds(
941     rrd_t *rrd,
942     char **updvals,
943     long *tmpl_idx,
944     char *input,
945     unsigned long tmpl_cnt,
946     time_t *current_time,
947     unsigned long *current_time_usec,
948     int version)
949 {
950     char     *p;
951     unsigned long i;
952     char      timesyntax;
953
954     updvals[0] = input;
955     /* initialize all ds input to unknown except the first one
956        which has always got to be set */
957     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
958         updvals[i] = "U";
959
960     /* separate all ds elements; first must be examined separately
961        due to alternate time syntax */
962     if ((p = strchr(input, '@')) != NULL) {
963         timesyntax = '@';
964     } else if ((p = strchr(input, ':')) != NULL) {
965         timesyntax = ':';
966     } else {
967         rrd_set_error("expected timestamp not found in data source from %s",
968                       input);
969         return -1;
970     }
971     *p = '\0';
972     i = 1;
973     updvals[tmpl_idx[i++]] = p + 1;
974     while (*(++p)) {
975         if (*p == ':') {
976             *p = '\0';
977             if (i < tmpl_cnt) {
978                 updvals[tmpl_idx[i++]] = p + 1;
979             }
980         }
981     }
982
983     if (i != tmpl_cnt) {
984         rrd_set_error("expected %lu data source readings (got %lu) from %s",
985                       tmpl_cnt - 1, i, input);
986         return -1;
987     }
988
989     if (get_time_from_reading(rrd, timesyntax, updvals,
990                               current_time, current_time_usec,
991                               version) == -1) {
992         return -1;
993     }
994     return 0;
995 }
996
997 /*
998  * Parse the time in a DS string, store it in current_time and 
999  * current_time_usec and verify that it's later than the last
1000  * update for this DS.
1001  *
1002  * Returns 0 on success, -1 on error.
1003  */
1004 static int get_time_from_reading(
1005     rrd_t *rrd,
1006     char timesyntax,
1007     char **updvals,
1008     time_t *current_time,
1009     unsigned long *current_time_usec,
1010     int version)
1011 {
1012     double    tmp;
1013     char     *parsetime_error = NULL;
1014     char     *old_locale;
1015     rrd_time_value_t ds_tv;
1016     struct timeval tmp_time;    /* used for time conversion */
1017
1018     /* get the time from the reading ... handle N */
1019     if (timesyntax == '@') {    /* at-style */
1020         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
1021             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
1022             return -1;
1023         }
1024         if (ds_tv.type == RELATIVE_TO_END_TIME ||
1025             ds_tv.type == RELATIVE_TO_START_TIME) {
1026             rrd_set_error("specifying time relative to the 'start' "
1027                           "or 'end' makes no sense here: %s", updvals[0]);
1028             return -1;
1029         }
1030         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1031         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1032     } else if (strcmp(updvals[0], "N") == 0) {
1033         gettimeofday(&tmp_time, 0);
1034         normalize_time(&tmp_time);
1035         *current_time = tmp_time.tv_sec;
1036         *current_time_usec = tmp_time.tv_usec;
1037     } else {
1038         old_locale = setlocale(LC_NUMERIC, "C");
1039         tmp = strtod(updvals[0], 0);
1040         setlocale(LC_NUMERIC, old_locale);
1041         *current_time = floor(tmp);
1042         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1043     }
1044     /* dont do any correction for old version RRDs */
1045     if (version < 3)
1046         *current_time_usec = 0;
1047
1048     if (*current_time < rrd->live_head->last_up ||
1049         (*current_time == rrd->live_head->last_up &&
1050          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1051         rrd_set_error("illegal attempt to update using time %ld when "
1052                       "last update time is %ld (minimum one second step)",
1053                       *current_time, rrd->live_head->last_up);
1054         return -1;
1055     }
1056     return 0;
1057 }
1058
1059 /*
1060  * Update pdp_new by interpreting the updvals according to the DS type
1061  * (COUNTER, GAUGE, etc.).
1062  *
1063  * Returns 0 on success, -1 on error.
1064  */
1065 static int update_pdp_prep(
1066     rrd_t *rrd,
1067     char **updvals,
1068     rrd_value_t *pdp_new,
1069     double interval)
1070 {
1071     unsigned long ds_idx;
1072     int       ii;
1073     char     *endptr;   /* used in the conversion */
1074     double    rate;
1075     char     *old_locale;
1076     enum dst_en dst_idx;
1077
1078     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1079         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1080
1081         /* make sure we do not build diffs with old last_ds values */
1082         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1083             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1084             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1085         }
1086
1087         /* NOTE: DST_CDEF should never enter this if block, because
1088          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1089          * accidently specified a value for the DST_CDEF. To handle this case,
1090          * an extra check is required. */
1091
1092         if ((updvals[ds_idx + 1][0] != 'U') &&
1093             (dst_idx != DST_CDEF) &&
1094             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1095             rate = DNAN;
1096
1097             /* pdp_new contains rate * time ... eg the bytes transferred during
1098              * the interval. Doing it this way saves a lot of math operations
1099              */
1100             switch (dst_idx) {
1101             case DST_COUNTER:
1102             case DST_DERIVE:
1103                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1104                     if ((updvals[ds_idx + 1][ii] < '0'
1105                          || updvals[ds_idx + 1][ii] > '9')
1106                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1107                         rrd_set_error("not a simple integer: '%s'",
1108                                       updvals[ds_idx + 1]);
1109                         return -1;
1110                     }
1111                 }
1112                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1113                     pdp_new[ds_idx] =
1114                         rrd_diff(updvals[ds_idx + 1],
1115                                  rrd->pdp_prep[ds_idx].last_ds);
1116                     if (dst_idx == DST_COUNTER) {
1117                         /* simple overflow catcher. This will fail
1118                          * terribly for non 32 or 64 bit counters
1119                          * ... are there any others in SNMP land?
1120                          */
1121                         if (pdp_new[ds_idx] < (double) 0.0)
1122                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1123                         if (pdp_new[ds_idx] < (double) 0.0)
1124                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1125                     }
1126                     rate = pdp_new[ds_idx] / interval;
1127                 } else {
1128                     pdp_new[ds_idx] = DNAN;
1129                 }
1130                 break;
1131             case DST_ABSOLUTE:
1132                 old_locale = setlocale(LC_NUMERIC, "C");
1133                 errno = 0;
1134                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1135                 setlocale(LC_NUMERIC, old_locale);
1136                 if (errno > 0) {
1137                     rrd_set_error("converting '%s' to float: %s",
1138                                   updvals[ds_idx + 1], rrd_strerror(errno));
1139                     return -1;
1140                 };
1141                 if (endptr[0] != '\0') {
1142                     rrd_set_error
1143                         ("conversion of '%s' to float not complete: tail '%s'",
1144                          updvals[ds_idx + 1], endptr);
1145                     return -1;
1146                 }
1147                 rate = pdp_new[ds_idx] / interval;
1148                 break;
1149             case DST_GAUGE:
1150                 errno = 0;
1151                 old_locale = setlocale(LC_NUMERIC, "C");
1152                 pdp_new[ds_idx] =
1153                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1154                 setlocale(LC_NUMERIC, old_locale);
1155                 if (errno) {
1156                     rrd_set_error("converting '%s' to float: %s",
1157                                   updvals[ds_idx + 1], rrd_strerror(errno));
1158                     return -1;
1159                 };
1160                 if (endptr[0] != '\0') {
1161                     rrd_set_error
1162                         ("conversion of '%s' to float not complete: tail '%s'",
1163                          updvals[ds_idx + 1], endptr);
1164                     return -1;
1165                 }
1166                 rate = pdp_new[ds_idx] / interval;
1167                 break;
1168             default:
1169                 rrd_set_error("rrd contains unknown DS type : '%s'",
1170                               rrd->ds_def[ds_idx].dst);
1171                 return -1;
1172             }
1173             /* break out of this for loop if the error string is set */
1174             if (rrd_test_error()) {
1175                 return -1;
1176             }
1177             /* make sure pdp_temp is neither too large or too small
1178              * if any of these occur it becomes unknown ...
1179              * sorry folks ... */
1180             if (!isnan(rate) &&
1181                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1182                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1183                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1184                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1185                 pdp_new[ds_idx] = DNAN;
1186             }
1187         } else {
1188             /* no news is news all the same */
1189             pdp_new[ds_idx] = DNAN;
1190         }
1191
1192
1193         /* make a copy of the command line argument for the next run */
1194 #ifdef DEBUG
1195         fprintf(stderr, "prep ds[%lu]\t"
1196                 "last_arg '%s'\t"
1197                 "this_arg '%s'\t"
1198                 "pdp_new %10.2f\n",
1199                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1200                 pdp_new[ds_idx]);
1201 #endif
1202         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1203                 LAST_DS_LEN - 1);
1204         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1205     }
1206     return 0;
1207 }
1208
1209 /*
1210  * How many PDP steps have elapsed since the last update? Returns the answer,
1211  * and stores the time between the last update and the last PDP in pre_time,
1212  * and the time between the last PDP and the current time in post_int.
1213  */
1214 static int calculate_elapsed_steps(
1215     rrd_t *rrd,
1216     unsigned long current_time,
1217     unsigned long current_time_usec,
1218     double interval,
1219     double *pre_int,
1220     double *post_int,
1221     unsigned long *proc_pdp_cnt)
1222 {
1223     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1224     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1225                                  * time */
1226     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1227                                  * when it was last updated */
1228     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1229
1230     /* when was the current pdp started */
1231     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1232     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1233
1234     /* when did the last pdp_st occur */
1235     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1236     occu_pdp_st = current_time - occu_pdp_age;
1237
1238     if (occu_pdp_st > proc_pdp_st) {
1239         /* OK we passed the pdp_st moment */
1240         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1241                                                                      * occurred before the latest
1242                                                                      * pdp_st moment*/
1243         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1244         *post_int = occu_pdp_age;   /* how much after it */
1245         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1246     } else {
1247         *pre_int = interval;
1248         *post_int = 0;
1249     }
1250
1251     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1252
1253 #ifdef DEBUG
1254     printf("proc_pdp_age %lu\t"
1255            "proc_pdp_st %lu\t"
1256            "occu_pfp_age %lu\t"
1257            "occu_pdp_st %lu\t"
1258            "int %lf\t"
1259            "pre_int %lf\t"
1260            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1261            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1262 #endif
1263
1264     /* compute the number of elapsed pdp_st moments */
1265     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1266 }
1267
1268 /*
1269  * Increment the PDP values by the values in pdp_new, or else initialize them.
1270  */
1271 static void simple_update(
1272     rrd_t *rrd,
1273     double interval,
1274     rrd_value_t *pdp_new)
1275 {
1276     int       i;
1277
1278     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1279         if (isnan(pdp_new[i])) {
1280             /* this is not really accurate if we use subsecond data arrival time
1281                should have thought of it when going subsecond resolution ...
1282                sorry next format change we will have it! */
1283             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1284                 floor(interval);
1285         } else {
1286             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1287                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1288             } else {
1289                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1290             }
1291         }
1292 #ifdef DEBUG
1293         fprintf(stderr,
1294                 "NO PDP  ds[%i]\t"
1295                 "value %10.2f\t"
1296                 "unkn_sec %5lu\n",
1297                 i,
1298                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1299                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1300 #endif
1301     }
1302 }
1303
1304 /*
1305  * Call process_pdp_st for each DS.
1306  *
1307  * Returns 0 on success, -1 on error.
1308  */
1309 static int process_all_pdp_st(
1310     rrd_t *rrd,
1311     double interval,
1312     double pre_int,
1313     double post_int,
1314     unsigned long elapsed_pdp_st,
1315     rrd_value_t *pdp_new,
1316     rrd_value_t *pdp_temp)
1317 {
1318     unsigned long ds_idx;
1319
1320     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1321        rate*seconds which occurred up to the last run.
1322        pdp_new[] contains rate*seconds from the latest run.
1323        pdp_temp[] will contain the rate for cdp */
1324
1325     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1326         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1327                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1328                            pdp_new, pdp_temp) == -1) {
1329             return -1;
1330         }
1331 #ifdef DEBUG
1332         fprintf(stderr, "PDP UPD ds[%lu]\t"
1333                 "elapsed_pdp_st %lu\t"
1334                 "pdp_temp %10.2f\t"
1335                 "new_prep %10.2f\t"
1336                 "new_unkn_sec %5lu\n",
1337                 ds_idx,
1338                 elapsed_pdp_st,
1339                 pdp_temp[ds_idx],
1340                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1341                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1342 #endif
1343     }
1344     return 0;
1345 }
1346
1347 /*
1348  * Process an update that occurs after one of the PDP moments.
1349  * Increments the PDP value, sets NAN if time greater than the
1350  * heartbeats have elapsed, processes CDEFs.
1351  *
1352  * Returns 0 on success, -1 on error.
1353  */
1354 static int process_pdp_st(
1355     rrd_t *rrd,
1356     unsigned long ds_idx,
1357     double interval,
1358     double pre_int,
1359     double post_int,
1360     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1361     rrd_value_t *pdp_new,
1362     rrd_value_t *pdp_temp)
1363 {
1364     int       i;
1365
1366     /* update pdp_prep to the current pdp_st. */
1367     double    pre_unknown = 0.0;
1368     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1369     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1370
1371     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1372
1373     rpnstack_init(&rpnstack);
1374
1375
1376     if (isnan(pdp_new[ds_idx])) {
1377         /* a final bit of unknown to be added before calculation
1378            we use a temporary variable for this so that we
1379            don't have to turn integer lines before using the value */
1380         pre_unknown = pre_int;
1381     } else {
1382         if (isnan(scratch[PDP_val].u_val)) {
1383             scratch[PDP_val].u_val = 0;
1384         }
1385         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1386     }
1387
1388     /* if too much of the pdp_prep is unknown we dump it */
1389     /* if the interval is larger thatn mrhb we get NAN */
1390     if ((interval > mrhb) ||
1391         (rrd->stat_head->pdp_step / 2.0 <
1392          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1393         pdp_temp[ds_idx] = DNAN;
1394     } else {
1395         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1396             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1397              pre_unknown);
1398     }
1399
1400     /* process CDEF data sources; remember each CDEF DS can
1401      * only reference other DS with a lower index number */
1402     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1403         rpnp_t   *rpnp;
1404
1405         rpnp =
1406             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1407         /* substitute data values for OP_VARIABLE nodes */
1408         for (i = 0; rpnp[i].op != OP_END; i++) {
1409             if (rpnp[i].op == OP_VARIABLE) {
1410                 rpnp[i].op = OP_NUMBER;
1411                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1412             }
1413         }
1414         /* run the rpn calculator */
1415         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1416             free(rpnp);
1417             rpnstack_free(&rpnstack);
1418             return -1;
1419         }
1420     }
1421
1422     /* make pdp_prep ready for the next run */
1423     if (isnan(pdp_new[ds_idx])) {
1424         /* this is not realy accurate if we use subsecond data arival time
1425            should have thought of it when going subsecond resolution ...
1426            sorry next format change we will have it! */
1427         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1428         scratch[PDP_val].u_val = DNAN;
1429     } else {
1430         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1431         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1432     }
1433     rpnstack_free(&rpnstack);
1434     return 0;
1435 }
1436
1437 /*
1438  * Iterate over all the RRAs for a given DS and:
1439  * 1. Decide whether to schedule a smooth later
1440  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1441  * 3. Update the CDP
1442  *
1443  * Returns 0 on success, -1 on error
1444  */
1445 static int update_all_cdp_prep(
1446     rrd_t *rrd,
1447     unsigned long *rra_step_cnt,
1448     unsigned long rra_begin,
1449     rrd_file_t *rrd_file,
1450     unsigned long elapsed_pdp_st,
1451     unsigned long proc_pdp_cnt,
1452     rrd_value_t **last_seasonal_coef,
1453     rrd_value_t **seasonal_coef,
1454     rrd_value_t *pdp_temp,
1455     unsigned long *rra_current,
1456     unsigned long *skip_update,
1457     int *schedule_smooth)
1458 {
1459     unsigned long rra_idx;
1460
1461     /* index into the CDP scratch array */
1462     enum cf_en current_cf;
1463     unsigned long rra_start;
1464
1465     /* number of rows to be updated in an RRA for a data value. */
1466     unsigned long start_pdp_offset;
1467
1468     rra_start = rra_begin;
1469     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1470         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1471         start_pdp_offset =
1472             rrd->rra_def[rra_idx].pdp_cnt -
1473             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1474         skip_update[rra_idx] = 0;
1475         if (start_pdp_offset <= elapsed_pdp_st) {
1476             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1477                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1478         } else {
1479             rra_step_cnt[rra_idx] = 0;
1480         }
1481
1482         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1483             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1484              * so that they will be correct for the next observed value; note that for
1485              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1486              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1487             if (rra_step_cnt[rra_idx] > 1) {
1488                 skip_update[rra_idx] = 1;
1489                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1490                                 elapsed_pdp_st, last_seasonal_coef);
1491                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1492                                 elapsed_pdp_st + 1, seasonal_coef);
1493             }
1494             /* periodically run a smoother for seasonal effects */
1495             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1496 #ifdef DEBUG
1497                 fprintf(stderr,
1498                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1499                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1500                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1501                         u_cnt);
1502 #endif
1503                 *schedule_smooth = 1;
1504             }
1505             *rra_current = rrd_tell(rrd_file);
1506         }
1507         if (rrd_test_error())
1508             return -1;
1509
1510         if (update_cdp_prep
1511             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1512              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1513              current_cf) == -1) {
1514             return -1;
1515         }
1516         rra_start +=
1517             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1518             sizeof(rrd_value_t);
1519     }
1520     return 0;
1521 }
1522
1523 /* 
1524  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1525  */
1526 static int do_schedule_smooth(
1527     rrd_t *rrd,
1528     unsigned long rra_idx,
1529     unsigned long elapsed_pdp_st)
1530 {
1531     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1532     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1533     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1534     unsigned long seasonal_smooth_idx =
1535         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1536     unsigned long *init_seasonal =
1537         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1538
1539     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1540      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1541      * really an RRA level, not a data source within RRA level parameter, but
1542      * the rra_def is read only for rrd_update (not flushed to disk). */
1543     if (*init_seasonal > BURNIN_CYCLES) {
1544         /* someone has no doubt invented a trick to deal with this wrap around,
1545          * but at least this code is clear. */
1546         if (seasonal_smooth_idx > cur_row) {
1547             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1548              * between PDP and CDP */
1549             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1550         }
1551         /* can't rely on negative numbers because we are working with
1552          * unsigned values */
1553         return (cur_row + elapsed_pdp_st >= row_cnt
1554                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1555     }
1556     /* mark off one of the burn-in cycles */
1557     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1558 }
1559
1560 /*
1561  * For a given RRA, iterate over the data sources and call the appropriate
1562  * consolidation function.
1563  *
1564  * Returns 0 on success, -1 on error.
1565  */
1566 static int update_cdp_prep(
1567     rrd_t *rrd,
1568     unsigned long elapsed_pdp_st,
1569     unsigned long start_pdp_offset,
1570     unsigned long *rra_step_cnt,
1571     int rra_idx,
1572     rrd_value_t *pdp_temp,
1573     rrd_value_t *last_seasonal_coef,
1574     rrd_value_t *seasonal_coef,
1575     int current_cf)
1576 {
1577     unsigned long ds_idx, cdp_idx;
1578
1579     /* update CDP_PREP areas */
1580     /* loop over data soures within each RRA */
1581     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1582
1583         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1584
1585         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1586             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1587                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1588                        elapsed_pdp_st, start_pdp_offset,
1589                        rrd->rra_def[rra_idx].pdp_cnt,
1590                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1591                        rra_idx, ds_idx);
1592         } else {
1593             /* Nothing to consolidate if there's one PDP per CDP. However, if
1594              * we've missed some PDPs, let's update null counters etc. */
1595             if (elapsed_pdp_st > 2) {
1596                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1597                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1598                           current_cf);
1599             }
1600         }
1601
1602         if (rrd_test_error())
1603             return -1;
1604     }                   /* endif data sources loop */
1605     return 0;
1606 }
1607
1608 /*
1609  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1610  * primary value, secondary value, and # of unknowns.
1611  */
1612 static void update_cdp(
1613     unival *scratch,
1614     int current_cf,
1615     rrd_value_t pdp_temp_val,
1616     unsigned long rra_step_cnt,
1617     unsigned long elapsed_pdp_st,
1618     unsigned long start_pdp_offset,
1619     unsigned long pdp_cnt,
1620     rrd_value_t xff,
1621     int i,
1622     int ii)
1623 {
1624     /* shorthand variables */
1625     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1626     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1627     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1628     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1629
1630     if (rra_step_cnt) {
1631         /* If we are in this block, as least 1 CDP value will be written to
1632          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1633          * to be written, then the "fill in" value is the CDP_secondary_val
1634          * entry. */
1635         if (isnan(pdp_temp_val)) {
1636             *cdp_unkn_pdp_cnt += start_pdp_offset;
1637             *cdp_secondary_val = DNAN;
1638         } else {
1639             /* CDP_secondary value is the RRA "fill in" value for intermediary
1640              * CDP data entries. No matter the CF, the value is the same because
1641              * the average, max, min, and last of a list of identical values is
1642              * the same, namely, the value itself. */
1643             *cdp_secondary_val = pdp_temp_val;
1644         }
1645
1646         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1647             *cdp_primary_val = DNAN;
1648             if (current_cf == CF_AVERAGE) {
1649                 *cdp_val =
1650                     initialize_average_carry_over(pdp_temp_val,
1651                                                   elapsed_pdp_st,
1652                                                   start_pdp_offset, pdp_cnt);
1653             } else {
1654                 *cdp_val = pdp_temp_val;
1655             }
1656         } else {
1657             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1658                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1659         }               /* endif meets xff value requirement for a valid value */
1660         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1661          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1662         if (isnan(pdp_temp_val))
1663             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1664         else
1665             *cdp_unkn_pdp_cnt = 0;
1666     } else {            /* rra_step_cnt[i]  == 0 */
1667
1668 #ifdef DEBUG
1669         if (isnan(*cdp_val)) {
1670             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1671                     i, ii);
1672         } else {
1673             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1674                     i, ii, *cdp_val);
1675         }
1676 #endif
1677         if (isnan(pdp_temp_val)) {
1678             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1679         } else {
1680             *cdp_val =
1681                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1682                                   current_cf, i, ii);
1683         }
1684     }
1685 }
1686
1687 /*
1688  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1689  * on the type of consolidation function.
1690  */
1691 static void initialize_cdp_val(
1692     unival *scratch,
1693     int current_cf,
1694     rrd_value_t pdp_temp_val,
1695     unsigned long elapsed_pdp_st,
1696     unsigned long start_pdp_offset,
1697     unsigned long pdp_cnt)
1698 {
1699     rrd_value_t cum_val, cur_val;
1700
1701     switch (current_cf) {
1702     case CF_AVERAGE:
1703         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1704         cur_val = IFDNAN(pdp_temp_val, 0.0);
1705         scratch[CDP_primary_val].u_val =
1706             (cum_val + cur_val * start_pdp_offset) /
1707             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1708         scratch[CDP_val].u_val =
1709             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1710                                           start_pdp_offset, pdp_cnt);
1711         break;
1712     case CF_MAXIMUM:
1713         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1714         cur_val = IFDNAN(pdp_temp_val, -DINF);
1715 #if 0
1716 #ifdef DEBUG
1717         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1718             fprintf(stderr,
1719                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1720                     i, ii);
1721             exit(-1);
1722         }
1723 #endif
1724 #endif
1725         if (cur_val > cum_val)
1726             scratch[CDP_primary_val].u_val = cur_val;
1727         else
1728             scratch[CDP_primary_val].u_val = cum_val;
1729         /* initialize carry over value */
1730         scratch[CDP_val].u_val = pdp_temp_val;
1731         break;
1732     case CF_MINIMUM:
1733         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1734         cur_val = IFDNAN(pdp_temp_val, DINF);
1735 #if 0
1736 #ifdef DEBUG
1737         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1738             fprintf(stderr,
1739                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1740                     ii);
1741             exit(-1);
1742         }
1743 #endif
1744 #endif
1745         if (cur_val < cum_val)
1746             scratch[CDP_primary_val].u_val = cur_val;
1747         else
1748             scratch[CDP_primary_val].u_val = cum_val;
1749         /* initialize carry over value */
1750         scratch[CDP_val].u_val = pdp_temp_val;
1751         break;
1752     case CF_LAST:
1753     default:
1754         scratch[CDP_primary_val].u_val = pdp_temp_val;
1755         /* initialize carry over value */
1756         scratch[CDP_val].u_val = pdp_temp_val;
1757         break;
1758     }
1759 }
1760
1761 /*
1762  * Update the consolidation function for Holt-Winters functions as
1763  * well as other functions that don't actually consolidate multiple
1764  * PDPs.
1765  */
1766 static void reset_cdp(
1767     rrd_t *rrd,
1768     unsigned long elapsed_pdp_st,
1769     rrd_value_t *pdp_temp,
1770     rrd_value_t *last_seasonal_coef,
1771     rrd_value_t *seasonal_coef,
1772     int rra_idx,
1773     int ds_idx,
1774     int cdp_idx,
1775     enum cf_en current_cf)
1776 {
1777     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1778
1779     switch (current_cf) {
1780     case CF_AVERAGE:
1781     default:
1782         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1783         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1784         break;
1785     case CF_SEASONAL:
1786     case CF_DEVSEASONAL:
1787         /* need to update cached seasonal values, so they are consistent
1788          * with the bulk update */
1789         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1790          * CDP_last_deviation are the same. */
1791         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1792         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1793         break;
1794     case CF_HWPREDICT:
1795     case CF_MHWPREDICT:
1796         /* need to update the null_count and last_null_count.
1797          * even do this for non-DNAN pdp_temp because the
1798          * algorithm is not learning from batch updates. */
1799         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1800         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1801         /* fall through */
1802     case CF_DEVPREDICT:
1803         scratch[CDP_primary_val].u_val = DNAN;
1804         scratch[CDP_secondary_val].u_val = DNAN;
1805         break;
1806     case CF_FAILURES:
1807         /* do not count missed bulk values as failures */
1808         scratch[CDP_primary_val].u_val = 0;
1809         scratch[CDP_secondary_val].u_val = 0;
1810         /* need to reset violations buffer.
1811          * could do this more carefully, but for now, just
1812          * assume a bulk update wipes away all violations. */
1813         erase_violations(rrd, cdp_idx, rra_idx);
1814         break;
1815     }
1816 }
1817
1818 static rrd_value_t initialize_average_carry_over(
1819     rrd_value_t pdp_temp_val,
1820     unsigned long elapsed_pdp_st,
1821     unsigned long start_pdp_offset,
1822     unsigned long pdp_cnt)
1823 {
1824     /* initialize carry over value */
1825     if (isnan(pdp_temp_val)) {
1826         return DNAN;
1827     }
1828     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1829 }
1830
1831 /*
1832  * Update or initialize a CDP value based on the consolidation
1833  * function.
1834  *
1835  * Returns the new value.
1836  */
1837 static rrd_value_t calculate_cdp_val(
1838     rrd_value_t cdp_val,
1839     rrd_value_t pdp_temp_val,
1840     unsigned long elapsed_pdp_st,
1841     int current_cf,
1842 #ifdef DEBUG
1843     int i,
1844     int ii
1845 #else
1846     int UNUSED(i),
1847     int UNUSED(ii)
1848 #endif
1849     )
1850 {
1851     if (isnan(cdp_val)) {
1852         if (current_cf == CF_AVERAGE) {
1853             pdp_temp_val *= elapsed_pdp_st;
1854         }
1855 #ifdef DEBUG
1856         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1857                 i, ii, pdp_temp_val);
1858 #endif
1859         return pdp_temp_val;
1860     }
1861     if (current_cf == CF_AVERAGE)
1862         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1863     if (current_cf == CF_MINIMUM)
1864         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1865     if (current_cf == CF_MAXIMUM)
1866         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1867
1868     return pdp_temp_val;
1869 }
1870
1871 /*
1872  * For each RRA, update the seasonal values and then call update_aberrant_CF
1873  * for each data source.
1874  *
1875  * Return 0 on success, -1 on error.
1876  */
1877 static int update_aberrant_cdps(
1878     rrd_t *rrd,
1879     rrd_file_t *rrd_file,
1880     unsigned long rra_begin,
1881     unsigned long *rra_current,
1882     unsigned long elapsed_pdp_st,
1883     rrd_value_t *pdp_temp,
1884     rrd_value_t **seasonal_coef)
1885 {
1886     unsigned long rra_idx, ds_idx, j;
1887
1888     /* number of PDP steps since the last update that
1889      * are assigned to the first CDP to be generated
1890      * since the last update. */
1891     unsigned short scratch_idx;
1892     unsigned long rra_start;
1893     enum cf_en current_cf;
1894
1895     /* this loop is only entered if elapsed_pdp_st < 3 */
1896     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1897          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1898         rra_start = rra_begin;
1899         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1900             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1901                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1902                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1903                     if (scratch_idx == CDP_primary_val) {
1904                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1905                                         elapsed_pdp_st + 1, seasonal_coef);
1906                     } else {
1907                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1908                                         elapsed_pdp_st + 2, seasonal_coef);
1909                     }
1910                     *rra_current = rrd_tell(rrd_file);
1911                 }
1912                 if (rrd_test_error())
1913                     return -1;
1914                 /* loop over data soures within each RRA */
1915                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1916                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1917                                        rra_idx * (rrd->stat_head->ds_cnt) +
1918                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1919                                        *seasonal_coef);
1920                 }
1921             }
1922             rra_start += rrd->rra_def[rra_idx].row_cnt
1923                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1924         }
1925     }
1926     return 0;
1927 }
1928
1929 /* 
1930  * Move sequentially through the file, writing one RRA at a time.  Note this
1931  * architecture divorces the computation of CDP with flushing updated RRA
1932  * entries to disk.
1933  *
1934  * Return 0 on success, -1 on error.
1935  */
1936 static int write_to_rras(
1937     rrd_t *rrd,
1938     rrd_file_t *rrd_file,
1939     unsigned long *rra_step_cnt,
1940     unsigned long rra_begin,
1941     unsigned long *rra_current,
1942     time_t current_time,
1943     unsigned long *skip_update,
1944     rrd_info_t ** pcdp_summary)
1945 {
1946     unsigned long rra_idx;
1947     unsigned long rra_start;
1948     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1949     time_t    rra_time = 0; /* time of update for a RRA */
1950
1951     /* Ready to write to disk */
1952     rra_start = rra_begin;
1953     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1954         /* skip unless there's something to write */
1955         if (rra_step_cnt[rra_idx]) {
1956             /* write the first row */
1957 #ifdef DEBUG
1958             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1959 #endif
1960             rrd->rra_ptr[rra_idx].cur_row++;
1961             if (rrd->rra_ptr[rra_idx].cur_row >=
1962                 rrd->rra_def[rra_idx].row_cnt)
1963                 rrd->rra_ptr[rra_idx].cur_row = 0;  /* wrap around */
1964             /* position on the first row */
1965             rra_pos_tmp = rra_start +
1966                 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1967                 sizeof(rrd_value_t);
1968             if (rra_pos_tmp != *rra_current) {
1969                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1970                     rrd_set_error("seek error in rrd");
1971                     return -1;
1972                 }
1973                 *rra_current = rra_pos_tmp;
1974             }
1975 #ifdef DEBUG
1976             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1977 #endif
1978             if (!skip_update[rra_idx]) {
1979                 if (*pcdp_summary != NULL) {
1980                     rra_time = (current_time - current_time
1981                                 % (rrd->rra_def[rra_idx].pdp_cnt *
1982                                    rrd->stat_head->pdp_step))
1983                         -
1984                         ((rra_step_cnt[rra_idx] -
1985                           1) * rrd->rra_def[rra_idx].pdp_cnt *
1986                          rrd->stat_head->pdp_step);
1987                 }
1988                 if (write_RRA_row
1989                     (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1990                      pcdp_summary, rra_time) == -1)
1991                     return -1;
1992             }
1993
1994             /* write other rows of the bulk update, if any */
1995             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1996                 if (++rrd->rra_ptr[rra_idx].cur_row ==
1997                     rrd->rra_def[rra_idx].row_cnt) {
1998 #ifdef DEBUG
1999                     fprintf(stderr,
2000                             "Wraparound for RRA %s, %lu updates left\n",
2001                             rrd->rra_def[rra_idx].cf_nam,
2002                             rra_step_cnt[rra_idx] - 1);
2003 #endif
2004                     /* wrap */
2005                     rrd->rra_ptr[rra_idx].cur_row = 0;
2006                     /* seek back to beginning of current rra */
2007                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
2008                         rrd_set_error("seek error in rrd");
2009                         return -1;
2010                     }
2011 #ifdef DEBUG
2012                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
2013                             rrd_file->pos);
2014 #endif
2015                     *rra_current = rra_start;
2016                 }
2017                 if (!skip_update[rra_idx]) {
2018                     if (*pcdp_summary != NULL) {
2019                         rra_time = (current_time - current_time
2020                                     % (rrd->rra_def[rra_idx].pdp_cnt *
2021                                        rrd->stat_head->pdp_step))
2022                             -
2023                             ((rra_step_cnt[rra_idx] -
2024                               2) * rrd->rra_def[rra_idx].pdp_cnt *
2025                              rrd->stat_head->pdp_step);
2026                     }
2027                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
2028                                       CDP_secondary_val, pcdp_summary,
2029                                       rra_time) == -1)
2030                         return -1;
2031                 }
2032             }
2033         }
2034         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
2035             sizeof(rrd_value_t);
2036     }                   /* RRA LOOP */
2037
2038     return 0;
2039 }
2040
2041 /*
2042  * Write out one row of values (one value per DS) to the archive.
2043  *
2044  * Returns 0 on success, -1 on error.
2045  */
2046 static int write_RRA_row(
2047     rrd_file_t *rrd_file,
2048     rrd_t *rrd,
2049     unsigned long rra_idx,
2050     unsigned long *rra_current,
2051     unsigned short CDP_scratch_idx,
2052     rrd_info_t ** pcdp_summary,
2053     time_t rra_time)
2054 {
2055     unsigned long ds_idx, cdp_idx;
2056     rrd_infoval_t iv;
2057
2058     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2059         /* compute the cdp index */
2060         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2061 #ifdef DEBUG
2062         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2063                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2064                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2065 #endif
2066         if (*pcdp_summary != NULL) {
2067             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2068             /* append info to the return hash */
2069             *pcdp_summary = rrd_info_push(*pcdp_summary,
2070                                           sprintf_alloc
2071                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2072                                            rrd->rra_def[rra_idx].cf_nam,
2073                                            rrd->rra_def[rra_idx].pdp_cnt,
2074                                            rrd->ds_def[ds_idx].ds_nam),
2075                                           RD_I_VAL, iv);
2076         }
2077         if (rrd_write(rrd_file,
2078                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2079                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2080             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2081             return -1;
2082         }
2083         *rra_current += sizeof(rrd_value_t);
2084     }
2085     return 0;
2086 }
2087
2088 /*
2089  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2090  *
2091  * Returns 0 on success, -1 otherwise
2092  */
2093 static int smooth_all_rras(
2094     rrd_t *rrd,
2095     rrd_file_t *rrd_file,
2096     unsigned long rra_begin)
2097 {
2098     unsigned long rra_start = rra_begin;
2099     unsigned long rra_idx;
2100
2101     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2102         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2103             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2104 #ifdef DEBUG
2105             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2106 #endif
2107             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2108             if (rrd_test_error())
2109                 return -1;
2110         }
2111         rra_start += rrd->rra_def[rra_idx].row_cnt
2112             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2113     }
2114     return 0;
2115 }
2116
2117 #ifndef HAVE_MMAP
2118 /*
2119  * Flush changes to disk (unless we're using mmap)
2120  *
2121  * Returns 0 on success, -1 otherwise
2122  */
2123 static int write_changes_to_disk(
2124     rrd_t *rrd,
2125     rrd_file_t *rrd_file,
2126     int version)
2127 {
2128     /* we just need to write back the live header portion now */
2129     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2130                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2131                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2132                  SEEK_SET) != 0) {
2133         rrd_set_error("seek rrd for live header writeback");
2134         return -1;
2135     }
2136     if (version >= 3) {
2137         if (rrd_write(rrd_file, rrd->live_head,
2138                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2139             rrd_set_error("rrd_write live_head to rrd");
2140             return -1;
2141         }
2142     } else {
2143         if (rrd_write(rrd_file, rrd->legacy_last_up,
2144                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2145             rrd_set_error("rrd_write live_head to rrd");
2146             return -1;
2147         }
2148     }
2149
2150
2151     if (rrd_write(rrd_file, rrd->pdp_prep,
2152                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2153         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2154         rrd_set_error("rrd_write pdp_prep to rrd");
2155         return -1;
2156     }
2157
2158     if (rrd_write(rrd_file, rrd->cdp_prep,
2159                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2160                   rrd->stat_head->ds_cnt)
2161         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2162                       rrd->stat_head->ds_cnt)) {
2163
2164         rrd_set_error("rrd_write cdp_prep to rrd");
2165         return -1;
2166     }
2167
2168     if (rrd_write(rrd_file, rrd->rra_ptr,
2169                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2170         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2171         rrd_set_error("rrd_write rra_ptr to rrd");
2172         return -1;
2173     }
2174     return 0;
2175 }
2176 #endif