681dcc47ea6b0219909978c1474bc4467f50cbb6
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.0  Copyright by Tobi Oetiker, 1997-2008
4  *                Copyright by Florian Forster, 2008
5  *****************************************************************************
6  * rrd_update.c  RRD Update Function
7  *****************************************************************************
8  * $Id$
9  *****************************************************************************/
10
11 #include "rrd_tool.h"
12
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
18
19 #include <locale.h>
20
21 #include "rrd_hw.h"
22 #include "rrd_rpncalc.h"
23
24 #include "rrd_is_thread_safe.h"
25 #include "unused.h"
26
27 #include "rrd_client.h"
28
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 /*
31  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
32  * replacement.
33  */
34 #include <sys/timeb.h>
35
36 #ifndef __MINGW32__
37 struct timeval {
38     time_t    tv_sec;   /* seconds */
39     long      tv_usec;  /* microseconds */
40 };
41 #endif
42
43 struct __timezone {
44     int       tz_minuteswest;   /* minutes W of Greenwich */
45     int       tz_dsttime;   /* type of dst correction */
46 };
47
48 static int gettimeofday(
49     struct timeval *t,
50     struct __timezone *tz)
51 {
52
53     struct _timeb current_time;
54
55     _ftime(&current_time);
56
57     t->tv_sec = current_time.time;
58     t->tv_usec = current_time.millitm * 1000;
59
60     return 0;
61 }
62
63 #endif
64
65 /* FUNCTION PROTOTYPES */
66
67 int       rrd_update_r(
68     const char *filename,
69     const char *tmplt,
70     int argc,
71     const char **argv);
72 int       _rrd_update(
73     const char *filename,
74     const char *tmplt,
75     int argc,
76     const char **argv,
77     rrd_info_t *);
78
79 static int allocate_data_structures(
80     rrd_t *rrd,
81     char ***updvals,
82     rrd_value_t **pdp_temp,
83     const char *tmplt,
84     long **tmpl_idx,
85     unsigned long *tmpl_cnt,
86     unsigned long **rra_step_cnt,
87     unsigned long **skip_update,
88     rrd_value_t **pdp_new);
89
90 static int parse_template(
91     rrd_t *rrd,
92     const char *tmplt,
93     unsigned long *tmpl_cnt,
94     long *tmpl_idx);
95
96 static int process_arg(
97     char *step_start,
98     rrd_t *rrd,
99     rrd_file_t *rrd_file,
100     unsigned long rra_begin,
101     unsigned long *rra_current,
102     time_t *current_time,
103     unsigned long *current_time_usec,
104     rrd_value_t *pdp_temp,
105     rrd_value_t *pdp_new,
106     unsigned long *rra_step_cnt,
107     char **updvals,
108     long *tmpl_idx,
109     unsigned long tmpl_cnt,
110     rrd_info_t ** pcdp_summary,
111     int version,
112     unsigned long *skip_update,
113     int *schedule_smooth);
114
115 static int parse_ds(
116     rrd_t *rrd,
117     char **updvals,
118     long *tmpl_idx,
119     char *input,
120     unsigned long tmpl_cnt,
121     time_t *current_time,
122     unsigned long *current_time_usec,
123     int version);
124
125 static int get_time_from_reading(
126     rrd_t *rrd,
127     char timesyntax,
128     char **updvals,
129     time_t *current_time,
130     unsigned long *current_time_usec,
131     int version);
132
133 static int update_pdp_prep(
134     rrd_t *rrd,
135     char **updvals,
136     rrd_value_t *pdp_new,
137     double interval);
138
139 static int calculate_elapsed_steps(
140     rrd_t *rrd,
141     unsigned long current_time,
142     unsigned long current_time_usec,
143     double interval,
144     double *pre_int,
145     double *post_int,
146     unsigned long *proc_pdp_cnt);
147
148 static void simple_update(
149     rrd_t *rrd,
150     double interval,
151     rrd_value_t *pdp_new);
152
153 static int process_all_pdp_st(
154     rrd_t *rrd,
155     double interval,
156     double pre_int,
157     double post_int,
158     unsigned long elapsed_pdp_st,
159     rrd_value_t *pdp_new,
160     rrd_value_t *pdp_temp);
161
162 static int process_pdp_st(
163     rrd_t *rrd,
164     unsigned long ds_idx,
165     double interval,
166     double pre_int,
167     double post_int,
168     long diff_pdp_st,
169     rrd_value_t *pdp_new,
170     rrd_value_t *pdp_temp);
171
172 static int update_all_cdp_prep(
173     rrd_t *rrd,
174     unsigned long *rra_step_cnt,
175     unsigned long rra_begin,
176     rrd_file_t *rrd_file,
177     unsigned long elapsed_pdp_st,
178     unsigned long proc_pdp_cnt,
179     rrd_value_t **last_seasonal_coef,
180     rrd_value_t **seasonal_coef,
181     rrd_value_t *pdp_temp,
182     unsigned long *rra_current,
183     unsigned long *skip_update,
184     int *schedule_smooth);
185
186 static int do_schedule_smooth(
187     rrd_t *rrd,
188     unsigned long rra_idx,
189     unsigned long elapsed_pdp_st);
190
191 static int update_cdp_prep(
192     rrd_t *rrd,
193     unsigned long elapsed_pdp_st,
194     unsigned long start_pdp_offset,
195     unsigned long *rra_step_cnt,
196     int rra_idx,
197     rrd_value_t *pdp_temp,
198     rrd_value_t *last_seasonal_coef,
199     rrd_value_t *seasonal_coef,
200     int current_cf);
201
202 static void update_cdp(
203     unival *scratch,
204     int current_cf,
205     rrd_value_t pdp_temp_val,
206     unsigned long rra_step_cnt,
207     unsigned long elapsed_pdp_st,
208     unsigned long start_pdp_offset,
209     unsigned long pdp_cnt,
210     rrd_value_t xff,
211     int i,
212     int ii);
213
214 static void initialize_cdp_val(
215     unival *scratch,
216     int current_cf,
217     rrd_value_t pdp_temp_val,
218     unsigned long elapsed_pdp_st,
219     unsigned long start_pdp_offset,
220     unsigned long pdp_cnt);
221
222 static void reset_cdp(
223     rrd_t *rrd,
224     unsigned long elapsed_pdp_st,
225     rrd_value_t *pdp_temp,
226     rrd_value_t *last_seasonal_coef,
227     rrd_value_t *seasonal_coef,
228     int rra_idx,
229     int ds_idx,
230     int cdp_idx,
231     enum cf_en current_cf);
232
233 static rrd_value_t initialize_average_carry_over(
234     rrd_value_t pdp_temp_val,
235     unsigned long elapsed_pdp_st,
236     unsigned long start_pdp_offset,
237     unsigned long pdp_cnt);
238
239 static rrd_value_t calculate_cdp_val(
240     rrd_value_t cdp_val,
241     rrd_value_t pdp_temp_val,
242     unsigned long elapsed_pdp_st,
243     int current_cf,
244     int i,
245     int ii);
246
247 static int update_aberrant_cdps(
248     rrd_t *rrd,
249     rrd_file_t *rrd_file,
250     unsigned long rra_begin,
251     unsigned long *rra_current,
252     unsigned long elapsed_pdp_st,
253     rrd_value_t *pdp_temp,
254     rrd_value_t **seasonal_coef);
255
256 static int write_to_rras(
257     rrd_t *rrd,
258     rrd_file_t *rrd_file,
259     unsigned long *rra_step_cnt,
260     unsigned long rra_begin,
261     unsigned long *rra_current,
262     time_t current_time,
263     unsigned long *skip_update,
264     rrd_info_t ** pcdp_summary);
265
266 static int write_RRA_row(
267     rrd_file_t *rrd_file,
268     rrd_t *rrd,
269     unsigned long rra_idx,
270     unsigned long *rra_current,
271     unsigned short CDP_scratch_idx,
272     rrd_info_t ** pcdp_summary,
273     time_t rra_time);
274
275 static int smooth_all_rras(
276     rrd_t *rrd,
277     rrd_file_t *rrd_file,
278     unsigned long rra_begin);
279
280 #ifndef HAVE_MMAP
281 static int write_changes_to_disk(
282     rrd_t *rrd,
283     rrd_file_t *rrd_file,
284     int version);
285 #endif
286
287 /*
288  * normalize time as returned by gettimeofday. usec part must
289  * be always >= 0
290  */
291 static inline void normalize_time(
292     struct timeval *t)
293 {
294     if (t->tv_usec < 0) {
295         t->tv_sec--;
296         t->tv_usec += 1e6L;
297     }
298 }
299
300 /*
301  * Sets current_time and current_time_usec based on the current time.
302  * current_time_usec is set to 0 if the version number is 1 or 2.
303  */
304 static inline void initialize_time(
305     time_t *current_time,
306     unsigned long *current_time_usec,
307     int version)
308 {
309     struct timeval tmp_time;    /* used for time conversion */
310
311     gettimeofday(&tmp_time, 0);
312     normalize_time(&tmp_time);
313     *current_time = tmp_time.tv_sec;
314     if (version >= 3) {
315         *current_time_usec = tmp_time.tv_usec;
316     } else {
317         *current_time_usec = 0;
318     }
319 }
320
321 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
322
323 rrd_info_t *rrd_update_v(
324     int argc,
325     char **argv)
326 {
327     char     *tmplt = NULL;
328     rrd_info_t *result = NULL;
329     rrd_infoval_t rc;
330     struct option long_options[] = {
331         {"template", required_argument, 0, 't'},
332         {0, 0, 0, 0}
333     };
334
335     rc.u_int = -1;
336     optind = 0;
337     opterr = 0;         /* initialize getopt */
338
339     while (1) {
340         int       option_index = 0;
341         int       opt;
342
343         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
344
345         if (opt == EOF)
346             break;
347
348         switch (opt) {
349         case 't':
350             tmplt = optarg;
351             break;
352
353         case '?':
354             rrd_set_error("unknown option '%s'", argv[optind - 1]);
355             goto end_tag;
356         }
357     }
358
359     /* need at least 2 arguments: filename, data. */
360     if (argc - optind < 2) {
361         rrd_set_error("Not enough arguments");
362         goto end_tag;
363     }
364     rc.u_int = 0;
365     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
366     rc.u_int = _rrd_update(argv[optind], tmplt,
367                            argc - optind - 1,
368                            (const char **) (argv + optind + 1), result);
369     result->value.u_int = rc.u_int;
370   end_tag:
371     return result;
372 }
373
374 int rrd_update(
375     int argc,
376     char **argv)
377 {
378     struct option long_options[] = {
379         {"template", required_argument, 0, 't'},
380         {"cache",    optional_argument, 0, 'c'},
381         {"nocache",  no_argument      , 0, 'n'},
382         {"daemon",   required_argument, 0, 'd'},
383         {0, 0, 0, 0}
384     };
385     int       option_index = 0;
386     int       opt;
387     char     *tmplt = NULL;
388     int       rc = -1;
389     char     *daemon = NULL;
390
391     optind = 0;
392     opterr = 0;         /* initialize getopt */
393
394     while (1) {
395         opt = getopt_long(argc, argv, "t:cnd:", long_options, &option_index);
396
397         if (opt == EOF)
398             break;
399
400         switch (opt) {
401         case 't':
402             tmplt = strdup(optarg);
403             break;
404
405         case 'd':
406             if (daemon != NULL)
407                 free (daemon);
408             daemon = strdup (optarg);
409             if (daemon == NULL)
410             {
411                 rrd_set_error("strdup failed.");
412                 goto out;
413             }
414             break;
415
416         case '?':
417             rrd_set_error("unknown option '%s'", argv[optind - 1]);
418             goto out;
419         }
420     }
421
422     /* need at least 2 arguments: filename, data. */
423     if (argc - optind < 2) {
424         rrd_set_error("Not enough arguments");
425         goto out;
426     }
427
428     if ((tmplt != NULL) && (daemon != NULL))
429     {
430         rrd_set_error("The caching daemon cannot be used together with "
431                 "templates yet.");
432         goto out;
433     }
434
435     if (daemon != NULL)
436     {
437         int status;
438
439         status = rrdc_connect (daemon);
440         if (status != 0)
441         {
442             rrd_set_error("Unable to connect to daemon: %s",
443                     (status < 0)
444                     ? "Internal error"
445                     : rrd_strerror (status));
446             goto out;
447         }
448
449         status = rrdc_update (/* file = */ argv[optind],
450                 /* values_num = */ argc - optind - 1,
451                 /* values = */ (void *) (argv + optind + 1));
452         if (status != 0)
453         {
454             rrd_set_error("Failed sending the values to the daemon: %s",
455                     (status < 0)
456                     ? "Internal error"
457                     : rrd_strerror (status));
458         }
459
460         rrdc_disconnect ();
461         goto out;
462     } /* if (daemon != NULL) */
463
464     rc = rrd_update_r(argv[optind], tmplt,
465                       argc - optind - 1, (const char **) (argv + optind + 1));
466   out:
467     if (tmplt != NULL)
468     {
469         free(tmplt);
470         tmplt = NULL;
471     }
472     if (daemon != NULL)
473     {
474         free (daemon);
475         daemon = NULL;
476     }
477     return rc;
478 }
479
480 int rrd_update_r(
481     const char *filename,
482     const char *tmplt,
483     int argc,
484     const char **argv)
485 {
486     return _rrd_update(filename, tmplt, argc, argv, NULL);
487 }
488
489 int _rrd_update(
490     const char *filename,
491     const char *tmplt,
492     int argc,
493     const char **argv,
494     rrd_info_t * pcdp_summary)
495 {
496
497     int       arg_i = 2;
498
499     unsigned long rra_begin;    /* byte pointer to the rra
500                                  * area in the rrd file.  this
501                                  * pointer never changes value */
502     unsigned long rra_current;  /* byte pointer to the current write
503                                  * spot in the rrd file. */
504     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
505                              * to the existing entry */
506     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
507                              * to the cdp values */
508
509     long     *tmpl_idx; /* index representing the settings
510                          * transported by the tmplt index */
511     unsigned long tmpl_cnt = 2; /* time and data */
512     rrd_t     rrd;
513     time_t    current_time = 0;
514     unsigned long current_time_usec = 0;    /* microseconds part of current time */
515     char    **updvals;
516     int       schedule_smooth = 0;
517
518     /* number of elapsed PDP steps since last update */
519     unsigned long *rra_step_cnt = NULL;
520
521     int       version;  /* rrd version */
522     rrd_file_t *rrd_file;
523     char     *arg_copy; /* for processing the argv */
524     unsigned long *skip_update; /* RRAs to advance but not write */
525
526     /* need at least 1 arguments: data. */
527     if (argc < 1) {
528         rrd_set_error("Not enough arguments");
529         goto err_out;
530     }
531
532     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
533         goto err_free;
534     }
535     /* We are now at the beginning of the rra's */
536     rra_current = rra_begin = rrd_file->header_len;
537
538     version = atoi(rrd.stat_head->version);
539
540     initialize_time(&current_time, &current_time_usec, version);
541
542     /* get exclusive lock to whole file.
543      * lock gets removed when we close the file.
544      */
545     if (rrd_lock(rrd_file) != 0) {
546         rrd_set_error("could not lock RRD");
547         goto err_close;
548     }
549
550     if (allocate_data_structures(&rrd, &updvals,
551                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
552                                  &rra_step_cnt, &skip_update,
553                                  &pdp_new) == -1) {
554         goto err_close;
555     }
556
557     /* loop through the arguments. */
558     for (arg_i = 0; arg_i < argc; arg_i++) {
559         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
560             rrd_set_error("failed duplication argv entry");
561             break;
562         }
563         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
564                         &current_time, &current_time_usec, pdp_temp, pdp_new,
565                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
566                         &pcdp_summary, version, skip_update,
567                         &schedule_smooth) == -1) {
568             free(arg_copy);
569             break;
570         }
571         free(arg_copy);
572     }
573
574     free(rra_step_cnt);
575
576     /* if we got here and if there is an error and if the file has not been
577      * written to, then close things up and return. */
578     if (rrd_test_error()) {
579         goto err_free_structures;
580     }
581 #ifndef HAVE_MMAP
582     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
583         goto err_free_structures;
584     }
585 #endif
586
587     /* calling the smoothing code here guarantees at most one smoothing
588      * operation per rrd_update call. Unfortunately, it is possible with bulk
589      * updates, or a long-delayed update for smoothing to occur off-schedule.
590      * This really isn't critical except during the burn-in cycles. */
591     if (schedule_smooth) {
592         smooth_all_rras(&rrd, rrd_file, rra_begin);
593     }
594
595 /*    rrd_dontneed(rrd_file,&rrd); */
596     rrd_free(&rrd);
597     rrd_close(rrd_file);
598
599     free(pdp_new);
600     free(tmpl_idx);
601     free(pdp_temp);
602     free(skip_update);
603     free(updvals);
604     return 0;
605
606   err_free_structures:
607     free(pdp_new);
608     free(tmpl_idx);
609     free(pdp_temp);
610     free(skip_update);
611     free(updvals);
612   err_close:
613     rrd_close(rrd_file);
614   err_free:
615     rrd_free(&rrd);
616   err_out:
617     return -1;
618 }
619
620 /*
621  * get exclusive lock to whole file.
622  * lock gets removed when we close the file
623  *
624  * returns 0 on success
625  */
626 int rrd_lock(
627     rrd_file_t *file)
628 {
629     int       rcstat;
630
631     {
632 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
633         struct _stat st;
634
635         if (_fstat(file->fd, &st) == 0) {
636             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
637         } else {
638             rcstat = -1;
639         }
640 #else
641         struct flock lock;
642
643         lock.l_type = F_WRLCK;  /* exclusive write lock */
644         lock.l_len = 0; /* whole file */
645         lock.l_start = 0;   /* start of file */
646         lock.l_whence = SEEK_SET;   /* end of file */
647
648         rcstat = fcntl(file->fd, F_SETLK, &lock);
649 #endif
650     }
651
652     return (rcstat);
653 }
654
655 /*
656  * Allocate some important arrays used, and initialize the template.
657  *
658  * When it returns, either all of the structures are allocated
659  * or none of them are.
660  *
661  * Returns 0 on success, -1 on error.
662  */
663 static int allocate_data_structures(
664     rrd_t *rrd,
665     char ***updvals,
666     rrd_value_t **pdp_temp,
667     const char *tmplt,
668     long **tmpl_idx,
669     unsigned long *tmpl_cnt,
670     unsigned long **rra_step_cnt,
671     unsigned long **skip_update,
672     rrd_value_t **pdp_new)
673 {
674     unsigned  i, ii;
675     if ((*updvals = (char **) malloc(sizeof(char *)
676                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
677         rrd_set_error("allocating updvals pointer array.");
678         return -1;
679     }
680     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
681                                             * rrd->stat_head->ds_cnt)) ==
682         NULL) {
683         rrd_set_error("allocating pdp_temp.");
684         goto err_free_updvals;
685     }
686     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
687                                                  *
688                                                  rrd->stat_head->rra_cnt)) ==
689         NULL) {
690         rrd_set_error("allocating skip_update.");
691         goto err_free_pdp_temp;
692     }
693     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
694                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
695         rrd_set_error("allocating tmpl_idx.");
696         goto err_free_skip_update;
697     }
698     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
699                                                   *
700                                                   (rrd->stat_head->
701                                                    rra_cnt))) == NULL) {
702         rrd_set_error("allocating rra_step_cnt.");
703         goto err_free_tmpl_idx;
704     }
705
706     /* initialize tmplt redirector */
707     /* default config example (assume DS 1 is a CDEF DS)
708        tmpl_idx[0] -> 0; (time)
709        tmpl_idx[1] -> 1; (DS 0)
710        tmpl_idx[2] -> 3; (DS 2)
711        tmpl_idx[3] -> 4; (DS 3) */
712     (*tmpl_idx)[0] = 0; /* time */
713     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
714         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
715             (*tmpl_idx)[ii++] = i;
716     }
717     *tmpl_cnt = ii;
718
719     if (tmplt != NULL) {
720         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
721             goto err_free_rra_step_cnt;
722         }
723     }
724
725     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
726                                            * rrd->stat_head->ds_cnt)) == NULL) {
727         rrd_set_error("allocating pdp_new.");
728         goto err_free_rra_step_cnt;
729     }
730
731     return 0;
732
733   err_free_rra_step_cnt:
734     free(*rra_step_cnt);
735   err_free_tmpl_idx:
736     free(*tmpl_idx);
737   err_free_skip_update:
738     free(*skip_update);
739   err_free_pdp_temp:
740     free(*pdp_temp);
741   err_free_updvals:
742     free(*updvals);
743     return -1;
744 }
745
746 /*
747  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
748  *
749  * Returns 0 on success.
750  */
751 static int parse_template(
752     rrd_t *rrd,
753     const char *tmplt,
754     unsigned long *tmpl_cnt,
755     long *tmpl_idx)
756 {
757     char     *dsname, *tmplt_copy;
758     unsigned int tmpl_len, i;
759     int       ret = 0;
760
761     *tmpl_cnt = 1;      /* the first entry is the time */
762
763     /* we should work on a writeable copy here */
764     if ((tmplt_copy = strdup(tmplt)) == NULL) {
765         rrd_set_error("error copying tmplt '%s'", tmplt);
766         ret = -1;
767         goto out;
768     }
769
770     dsname = tmplt_copy;
771     tmpl_len = strlen(tmplt_copy);
772     for (i = 0; i <= tmpl_len; i++) {
773         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
774             tmplt_copy[i] = '\0';
775             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
776                 rrd_set_error("tmplt contains more DS definitions than RRD");
777                 ret = -1;
778                 goto out_free_tmpl_copy;
779             }
780             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
781                 rrd_set_error("unknown DS name '%s'", dsname);
782                 ret = -1;
783                 goto out_free_tmpl_copy;
784             }
785             /* go to the next entry on the tmplt_copy */
786             if (i < tmpl_len)
787                 dsname = &tmplt_copy[i + 1];
788         }
789     }
790   out_free_tmpl_copy:
791     free(tmplt_copy);
792   out:
793     return ret;
794 }
795
796 /*
797  * Parse an update string, updates the primary data points (PDPs)
798  * and consolidated data points (CDPs), and writes changes to the RRAs.
799  *
800  * Returns 0 on success, -1 on error.
801  */
802 static int process_arg(
803     char *step_start,
804     rrd_t *rrd,
805     rrd_file_t *rrd_file,
806     unsigned long rra_begin,
807     unsigned long *rra_current,
808     time_t *current_time,
809     unsigned long *current_time_usec,
810     rrd_value_t *pdp_temp,
811     rrd_value_t *pdp_new,
812     unsigned long *rra_step_cnt,
813     char **updvals,
814     long *tmpl_idx,
815     unsigned long tmpl_cnt,
816     rrd_info_t ** pcdp_summary,
817     int version,
818     unsigned long *skip_update,
819     int *schedule_smooth)
820 {
821     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
822
823     /* a vector of future Holt-Winters seasonal coefs */
824     unsigned long elapsed_pdp_st;
825
826     double    interval, pre_int, post_int;  /* interval between this and
827                                              * the last run */
828     unsigned long proc_pdp_cnt;
829     unsigned long rra_start;
830
831     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
832                  current_time, current_time_usec, version) == -1) {
833         return -1;
834     }
835     /* seek to the beginning of the rra's */
836     if (*rra_current != rra_begin) {
837 #ifndef HAVE_MMAP
838         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
839             rrd_set_error("seek error in rrd");
840             return -1;
841         }
842 #endif
843         *rra_current = rra_begin;
844     }
845     rra_start = rra_begin;
846
847     interval = (double) (*current_time - rrd->live_head->last_up)
848         + (double) ((long) *current_time_usec -
849                     (long) rrd->live_head->last_up_usec) / 1e6f;
850
851     /* process the data sources and update the pdp_prep 
852      * area accordingly */
853     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
854         return -1;
855     }
856
857     elapsed_pdp_st = calculate_elapsed_steps(rrd,
858                                              *current_time,
859                                              *current_time_usec, interval,
860                                              &pre_int, &post_int,
861                                              &proc_pdp_cnt);
862
863     /* has a pdp_st moment occurred since the last run ? */
864     if (elapsed_pdp_st == 0) {
865         /* no we have not passed a pdp_st moment. therefore update is simple */
866         simple_update(rrd, interval, pdp_new);
867     } else {
868         /* an pdp_st has occurred. */
869         if (process_all_pdp_st(rrd, interval,
870                                pre_int, post_int,
871                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
872             return -1;
873         }
874         if (update_all_cdp_prep(rrd, rra_step_cnt,
875                                 rra_begin, rrd_file,
876                                 elapsed_pdp_st,
877                                 proc_pdp_cnt,
878                                 &last_seasonal_coef,
879                                 &seasonal_coef,
880                                 pdp_temp, rra_current,
881                                 skip_update, schedule_smooth) == -1) {
882             goto err_free_coefficients;
883         }
884         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
885                                  elapsed_pdp_st, pdp_temp,
886                                  &seasonal_coef) == -1) {
887             goto err_free_coefficients;
888         }
889         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
890                           rra_current, *current_time, skip_update,
891                           pcdp_summary) == -1) {
892             goto err_free_coefficients;
893         }
894     }                   /* endif a pdp_st has occurred */
895     rrd->live_head->last_up = *current_time;
896     rrd->live_head->last_up_usec = *current_time_usec;
897
898     if (version < 3) {
899         *rrd->legacy_last_up = rrd->live_head->last_up;
900     }
901     free(seasonal_coef);
902     free(last_seasonal_coef);
903     return 0;
904
905   err_free_coefficients:
906     free(seasonal_coef);
907     free(last_seasonal_coef);
908     return -1;
909 }
910
911 /*
912  * Parse a DS string (time + colon-separated values), storing the
913  * results in current_time, current_time_usec, and updvals.
914  *
915  * Returns 0 on success, -1 on error.
916  */
917 static int parse_ds(
918     rrd_t *rrd,
919     char **updvals,
920     long *tmpl_idx,
921     char *input,
922     unsigned long tmpl_cnt,
923     time_t *current_time,
924     unsigned long *current_time_usec,
925     int version)
926 {
927     char     *p;
928     unsigned long i;
929     char      timesyntax;
930
931     updvals[0] = input;
932     /* initialize all ds input to unknown except the first one
933        which has always got to be set */
934     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
935         updvals[i] = "U";
936
937     /* separate all ds elements; first must be examined separately
938        due to alternate time syntax */
939     if ((p = strchr(input, '@')) != NULL) {
940         timesyntax = '@';
941     } else if ((p = strchr(input, ':')) != NULL) {
942         timesyntax = ':';
943     } else {
944         rrd_set_error("expected timestamp not found in data source from %s",
945                       input);
946         return -1;
947     }
948     *p = '\0';
949     i = 1;
950     updvals[tmpl_idx[i++]] = p + 1;
951     while (*(++p)) {
952         if (*p == ':') {
953             *p = '\0';
954             if (i < tmpl_cnt) {
955                 updvals[tmpl_idx[i++]] = p + 1;
956             }
957         }
958     }
959
960     if (i != tmpl_cnt) {
961         rrd_set_error("expected %lu data source readings (got %lu) from %s",
962                       tmpl_cnt - 1, i, input);
963         return -1;
964     }
965
966     if (get_time_from_reading(rrd, timesyntax, updvals,
967                               current_time, current_time_usec,
968                               version) == -1) {
969         return -1;
970     }
971     return 0;
972 }
973
974 /*
975  * Parse the time in a DS string, store it in current_time and 
976  * current_time_usec and verify that it's later than the last
977  * update for this DS.
978  *
979  * Returns 0 on success, -1 on error.
980  */
981 static int get_time_from_reading(
982     rrd_t *rrd,
983     char timesyntax,
984     char **updvals,
985     time_t *current_time,
986     unsigned long *current_time_usec,
987     int version)
988 {
989     double    tmp;
990     char     *parsetime_error = NULL;
991     char     *old_locale;
992     rrd_time_value_t ds_tv;
993     struct timeval tmp_time;    /* used for time conversion */
994
995     /* get the time from the reading ... handle N */
996     if (timesyntax == '@') {    /* at-style */
997         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
998             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
999             return -1;
1000         }
1001         if (ds_tv.type == RELATIVE_TO_END_TIME ||
1002             ds_tv.type == RELATIVE_TO_START_TIME) {
1003             rrd_set_error("specifying time relative to the 'start' "
1004                           "or 'end' makes no sense here: %s", updvals[0]);
1005             return -1;
1006         }
1007         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1008         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1009     } else if (strcmp(updvals[0], "N") == 0) {
1010         gettimeofday(&tmp_time, 0);
1011         normalize_time(&tmp_time);
1012         *current_time = tmp_time.tv_sec;
1013         *current_time_usec = tmp_time.tv_usec;
1014     } else {
1015         old_locale = setlocale(LC_NUMERIC, "C");
1016         tmp = strtod(updvals[0], 0);
1017         setlocale(LC_NUMERIC, old_locale);
1018         *current_time = floor(tmp);
1019         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1020     }
1021     /* dont do any correction for old version RRDs */
1022     if (version < 3)
1023         *current_time_usec = 0;
1024
1025     if (*current_time < rrd->live_head->last_up ||
1026         (*current_time == rrd->live_head->last_up &&
1027          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1028         rrd_set_error("illegal attempt to update using time %ld when "
1029                       "last update time is %ld (minimum one second step)",
1030                       *current_time, rrd->live_head->last_up);
1031         return -1;
1032     }
1033     return 0;
1034 }
1035
1036 /*
1037  * Update pdp_new by interpreting the updvals according to the DS type
1038  * (COUNTER, GAUGE, etc.).
1039  *
1040  * Returns 0 on success, -1 on error.
1041  */
1042 static int update_pdp_prep(
1043     rrd_t *rrd,
1044     char **updvals,
1045     rrd_value_t *pdp_new,
1046     double interval)
1047 {
1048     unsigned long ds_idx;
1049     int       ii;
1050     char     *endptr;   /* used in the conversion */
1051     double    rate;
1052     char     *old_locale;
1053     enum dst_en dst_idx;
1054
1055     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1056         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1057
1058         /* make sure we do not build diffs with old last_ds values */
1059         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1060             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1061             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1062         }
1063
1064         /* NOTE: DST_CDEF should never enter this if block, because
1065          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1066          * accidently specified a value for the DST_CDEF. To handle this case,
1067          * an extra check is required. */
1068
1069         if ((updvals[ds_idx + 1][0] != 'U') &&
1070             (dst_idx != DST_CDEF) &&
1071             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1072             rate = DNAN;
1073
1074             /* pdp_new contains rate * time ... eg the bytes transferred during
1075              * the interval. Doing it this way saves a lot of math operations
1076              */
1077             switch (dst_idx) {
1078             case DST_COUNTER:
1079             case DST_DERIVE:
1080                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1081                     if ((updvals[ds_idx + 1][ii] < '0'
1082                          || updvals[ds_idx + 1][ii] > '9')
1083                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1084                         rrd_set_error("not a simple integer: '%s'",
1085                                       updvals[ds_idx + 1]);
1086                         return -1;
1087                     }
1088                 }
1089                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1090                     pdp_new[ds_idx] =
1091                         rrd_diff(updvals[ds_idx + 1],
1092                                  rrd->pdp_prep[ds_idx].last_ds);
1093                     if (dst_idx == DST_COUNTER) {
1094                         /* simple overflow catcher. This will fail
1095                          * terribly for non 32 or 64 bit counters
1096                          * ... are there any others in SNMP land?
1097                          */
1098                         if (pdp_new[ds_idx] < (double) 0.0)
1099                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1100                         if (pdp_new[ds_idx] < (double) 0.0)
1101                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1102                     }
1103                     rate = pdp_new[ds_idx] / interval;
1104                 } else {
1105                     pdp_new[ds_idx] = DNAN;
1106                 }
1107                 break;
1108             case DST_ABSOLUTE:
1109                 old_locale = setlocale(LC_NUMERIC, "C");
1110                 errno = 0;
1111                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1112                 setlocale(LC_NUMERIC, old_locale);
1113                 if (errno > 0) {
1114                     rrd_set_error("converting '%s' to float: %s",
1115                                   updvals[ds_idx + 1], rrd_strerror(errno));
1116                     return -1;
1117                 };
1118                 if (endptr[0] != '\0') {
1119                     rrd_set_error
1120                         ("conversion of '%s' to float not complete: tail '%s'",
1121                          updvals[ds_idx + 1], endptr);
1122                     return -1;
1123                 }
1124                 rate = pdp_new[ds_idx] / interval;
1125                 break;
1126             case DST_GAUGE:
1127                 errno = 0;
1128                 old_locale = setlocale(LC_NUMERIC, "C");
1129                 pdp_new[ds_idx] =
1130                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1131                 setlocale(LC_NUMERIC, old_locale);
1132                 if (errno) {
1133                     rrd_set_error("converting '%s' to float: %s",
1134                                   updvals[ds_idx + 1], rrd_strerror(errno));
1135                     return -1;
1136                 };
1137                 if (endptr[0] != '\0') {
1138                     rrd_set_error
1139                         ("conversion of '%s' to float not complete: tail '%s'",
1140                          updvals[ds_idx + 1], endptr);
1141                     return -1;
1142                 }
1143                 rate = pdp_new[ds_idx] / interval;
1144                 break;
1145             default:
1146                 rrd_set_error("rrd contains unknown DS type : '%s'",
1147                               rrd->ds_def[ds_idx].dst);
1148                 return -1;
1149             }
1150             /* break out of this for loop if the error string is set */
1151             if (rrd_test_error()) {
1152                 return -1;
1153             }
1154             /* make sure pdp_temp is neither too large or too small
1155              * if any of these occur it becomes unknown ...
1156              * sorry folks ... */
1157             if (!isnan(rate) &&
1158                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1159                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1160                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1161                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1162                 pdp_new[ds_idx] = DNAN;
1163             }
1164         } else {
1165             /* no news is news all the same */
1166             pdp_new[ds_idx] = DNAN;
1167         }
1168
1169
1170         /* make a copy of the command line argument for the next run */
1171 #ifdef DEBUG
1172         fprintf(stderr, "prep ds[%lu]\t"
1173                 "last_arg '%s'\t"
1174                 "this_arg '%s'\t"
1175                 "pdp_new %10.2f\n",
1176                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1177                 pdp_new[ds_idx]);
1178 #endif
1179         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1180                 LAST_DS_LEN - 1);
1181         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1182     }
1183     return 0;
1184 }
1185
1186 /*
1187  * How many PDP steps have elapsed since the last update? Returns the answer,
1188  * and stores the time between the last update and the last PDP in pre_time,
1189  * and the time between the last PDP and the current time in post_int.
1190  */
1191 static int calculate_elapsed_steps(
1192     rrd_t *rrd,
1193     unsigned long current_time,
1194     unsigned long current_time_usec,
1195     double interval,
1196     double *pre_int,
1197     double *post_int,
1198     unsigned long *proc_pdp_cnt)
1199 {
1200     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1201     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1202                                  * time */
1203     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1204                                  * when it was last updated */
1205     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1206
1207     /* when was the current pdp started */
1208     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1209     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1210
1211     /* when did the last pdp_st occur */
1212     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1213     occu_pdp_st = current_time - occu_pdp_age;
1214
1215     if (occu_pdp_st > proc_pdp_st) {
1216         /* OK we passed the pdp_st moment */
1217         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1218                                                                      * occurred before the latest
1219                                                                      * pdp_st moment*/
1220         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1221         *post_int = occu_pdp_age;   /* how much after it */
1222         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1223     } else {
1224         *pre_int = interval;
1225         *post_int = 0;
1226     }
1227
1228     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1229
1230 #ifdef DEBUG
1231     printf("proc_pdp_age %lu\t"
1232            "proc_pdp_st %lu\t"
1233            "occu_pfp_age %lu\t"
1234            "occu_pdp_st %lu\t"
1235            "int %lf\t"
1236            "pre_int %lf\t"
1237            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1238            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1239 #endif
1240
1241     /* compute the number of elapsed pdp_st moments */
1242     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1243 }
1244
1245 /*
1246  * Increment the PDP values by the values in pdp_new, or else initialize them.
1247  */
1248 static void simple_update(
1249     rrd_t *rrd,
1250     double interval,
1251     rrd_value_t *pdp_new)
1252 {
1253     int       i;
1254
1255     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1256         if (isnan(pdp_new[i])) {
1257             /* this is not really accurate if we use subsecond data arrival time
1258                should have thought of it when going subsecond resolution ...
1259                sorry next format change we will have it! */
1260             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1261                 floor(interval);
1262         } else {
1263             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1264                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1265             } else {
1266                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1267             }
1268         }
1269 #ifdef DEBUG
1270         fprintf(stderr,
1271                 "NO PDP  ds[%i]\t"
1272                 "value %10.2f\t"
1273                 "unkn_sec %5lu\n",
1274                 i,
1275                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1276                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1277 #endif
1278     }
1279 }
1280
1281 /*
1282  * Call process_pdp_st for each DS.
1283  *
1284  * Returns 0 on success, -1 on error.
1285  */
1286 static int process_all_pdp_st(
1287     rrd_t *rrd,
1288     double interval,
1289     double pre_int,
1290     double post_int,
1291     unsigned long elapsed_pdp_st,
1292     rrd_value_t *pdp_new,
1293     rrd_value_t *pdp_temp)
1294 {
1295     unsigned long ds_idx;
1296
1297     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1298        rate*seconds which occurred up to the last run.
1299        pdp_new[] contains rate*seconds from the latest run.
1300        pdp_temp[] will contain the rate for cdp */
1301
1302     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1303         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1304                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1305                            pdp_new, pdp_temp) == -1) {
1306             return -1;
1307         }
1308 #ifdef DEBUG
1309         fprintf(stderr, "PDP UPD ds[%lu]\t"
1310                 "elapsed_pdp_st %lu\t"
1311                 "pdp_temp %10.2f\t"
1312                 "new_prep %10.2f\t"
1313                 "new_unkn_sec %5lu\n",
1314                 ds_idx,
1315                 elapsed_pdp_st,
1316                 pdp_temp[ds_idx],
1317                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1318                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1319 #endif
1320     }
1321     return 0;
1322 }
1323
1324 /*
1325  * Process an update that occurs after one of the PDP moments.
1326  * Increments the PDP value, sets NAN if time greater than the
1327  * heartbeats have elapsed, processes CDEFs.
1328  *
1329  * Returns 0 on success, -1 on error.
1330  */
1331 static int process_pdp_st(
1332     rrd_t *rrd,
1333     unsigned long ds_idx,
1334     double interval,
1335     double pre_int,
1336     double post_int,
1337     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1338     rrd_value_t *pdp_new,
1339     rrd_value_t *pdp_temp)
1340 {
1341     int       i;
1342
1343     /* update pdp_prep to the current pdp_st. */
1344     double    pre_unknown = 0.0;
1345     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1346     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1347
1348     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1349
1350     rpnstack_init(&rpnstack);
1351
1352
1353     if (isnan(pdp_new[ds_idx])) {
1354         /* a final bit of unknown to be added before calculation
1355            we use a temporary variable for this so that we
1356            don't have to turn integer lines before using the value */
1357         pre_unknown = pre_int;
1358     } else {
1359         if (isnan(scratch[PDP_val].u_val)) {
1360             scratch[PDP_val].u_val = 0;
1361         }
1362         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1363     }
1364
1365     /* if too much of the pdp_prep is unknown we dump it */
1366     /* if the interval is larger thatn mrhb we get NAN */
1367     if ((interval > mrhb) ||
1368         (rrd->stat_head->pdp_step / 2.0 <
1369          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1370         pdp_temp[ds_idx] = DNAN;
1371     } else {
1372         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1373             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1374              pre_unknown);
1375     }
1376
1377     /* process CDEF data sources; remember each CDEF DS can
1378      * only reference other DS with a lower index number */
1379     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1380         rpnp_t   *rpnp;
1381
1382         rpnp =
1383             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1384         /* substitute data values for OP_VARIABLE nodes */
1385         for (i = 0; rpnp[i].op != OP_END; i++) {
1386             if (rpnp[i].op == OP_VARIABLE) {
1387                 rpnp[i].op = OP_NUMBER;
1388                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1389             }
1390         }
1391         /* run the rpn calculator */
1392         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1393             free(rpnp);
1394             rpnstack_free(&rpnstack);
1395             return -1;
1396         }
1397     }
1398
1399     /* make pdp_prep ready for the next run */
1400     if (isnan(pdp_new[ds_idx])) {
1401         /* this is not realy accurate if we use subsecond data arival time
1402            should have thought of it when going subsecond resolution ...
1403            sorry next format change we will have it! */
1404         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1405         scratch[PDP_val].u_val = DNAN;
1406     } else {
1407         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1408         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1409     }
1410     rpnstack_free(&rpnstack);
1411     return 0;
1412 }
1413
1414 /*
1415  * Iterate over all the RRAs for a given DS and:
1416  * 1. Decide whether to schedule a smooth later
1417  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1418  * 3. Update the CDP
1419  *
1420  * Returns 0 on success, -1 on error
1421  */
1422 static int update_all_cdp_prep(
1423     rrd_t *rrd,
1424     unsigned long *rra_step_cnt,
1425     unsigned long rra_begin,
1426     rrd_file_t *rrd_file,
1427     unsigned long elapsed_pdp_st,
1428     unsigned long proc_pdp_cnt,
1429     rrd_value_t **last_seasonal_coef,
1430     rrd_value_t **seasonal_coef,
1431     rrd_value_t *pdp_temp,
1432     unsigned long *rra_current,
1433     unsigned long *skip_update,
1434     int *schedule_smooth)
1435 {
1436     unsigned long rra_idx;
1437
1438     /* index into the CDP scratch array */
1439     enum cf_en current_cf;
1440     unsigned long rra_start;
1441
1442     /* number of rows to be updated in an RRA for a data value. */
1443     unsigned long start_pdp_offset;
1444
1445     rra_start = rra_begin;
1446     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1447         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1448         start_pdp_offset =
1449             rrd->rra_def[rra_idx].pdp_cnt -
1450             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1451         skip_update[rra_idx] = 0;
1452         if (start_pdp_offset <= elapsed_pdp_st) {
1453             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1454                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1455         } else {
1456             rra_step_cnt[rra_idx] = 0;
1457         }
1458
1459         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1460             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1461              * so that they will be correct for the next observed value; note that for
1462              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1463              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1464             if (rra_step_cnt[rra_idx] > 1) {
1465                 skip_update[rra_idx] = 1;
1466                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1467                                 elapsed_pdp_st, last_seasonal_coef);
1468                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1469                                 elapsed_pdp_st + 1, seasonal_coef);
1470             }
1471             /* periodically run a smoother for seasonal effects */
1472             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1473 #ifdef DEBUG
1474                 fprintf(stderr,
1475                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1476                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1477                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1478                         u_cnt);
1479 #endif
1480                 *schedule_smooth = 1;
1481             }
1482             *rra_current = rrd_tell(rrd_file);
1483         }
1484         if (rrd_test_error())
1485             return -1;
1486
1487         if (update_cdp_prep
1488             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1489              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1490              current_cf) == -1) {
1491             return -1;
1492         }
1493         rra_start +=
1494             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1495             sizeof(rrd_value_t);
1496     }
1497     return 0;
1498 }
1499
1500 /* 
1501  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1502  */
1503 static int do_schedule_smooth(
1504     rrd_t *rrd,
1505     unsigned long rra_idx,
1506     unsigned long elapsed_pdp_st)
1507 {
1508     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1509     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1510     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1511     unsigned long seasonal_smooth_idx =
1512         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1513     unsigned long *init_seasonal =
1514         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1515
1516     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1517      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1518      * really an RRA level, not a data source within RRA level parameter, but
1519      * the rra_def is read only for rrd_update (not flushed to disk). */
1520     if (*init_seasonal > BURNIN_CYCLES) {
1521         /* someone has no doubt invented a trick to deal with this wrap around,
1522          * but at least this code is clear. */
1523         if (seasonal_smooth_idx > cur_row) {
1524             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1525              * between PDP and CDP */
1526             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1527         }
1528         /* can't rely on negative numbers because we are working with
1529          * unsigned values */
1530         return (cur_row + elapsed_pdp_st >= row_cnt
1531                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1532     }
1533     /* mark off one of the burn-in cycles */
1534     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1535 }
1536
1537 /*
1538  * For a given RRA, iterate over the data sources and call the appropriate
1539  * consolidation function.
1540  *
1541  * Returns 0 on success, -1 on error.
1542  */
1543 static int update_cdp_prep(
1544     rrd_t *rrd,
1545     unsigned long elapsed_pdp_st,
1546     unsigned long start_pdp_offset,
1547     unsigned long *rra_step_cnt,
1548     int rra_idx,
1549     rrd_value_t *pdp_temp,
1550     rrd_value_t *last_seasonal_coef,
1551     rrd_value_t *seasonal_coef,
1552     int current_cf)
1553 {
1554     unsigned long ds_idx, cdp_idx;
1555
1556     /* update CDP_PREP areas */
1557     /* loop over data soures within each RRA */
1558     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1559
1560         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1561
1562         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1563             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1564                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1565                        elapsed_pdp_st, start_pdp_offset,
1566                        rrd->rra_def[rra_idx].pdp_cnt,
1567                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1568                        rra_idx, ds_idx);
1569         } else {
1570             /* Nothing to consolidate if there's one PDP per CDP. However, if
1571              * we've missed some PDPs, let's update null counters etc. */
1572             if (elapsed_pdp_st > 2) {
1573                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1574                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1575                           current_cf);
1576             }
1577         }
1578
1579         if (rrd_test_error())
1580             return -1;
1581     }                   /* endif data sources loop */
1582     return 0;
1583 }
1584
1585 /*
1586  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1587  * primary value, secondary value, and # of unknowns.
1588  */
1589 static void update_cdp(
1590     unival *scratch,
1591     int current_cf,
1592     rrd_value_t pdp_temp_val,
1593     unsigned long rra_step_cnt,
1594     unsigned long elapsed_pdp_st,
1595     unsigned long start_pdp_offset,
1596     unsigned long pdp_cnt,
1597     rrd_value_t xff,
1598     int i,
1599     int ii)
1600 {
1601     /* shorthand variables */
1602     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1603     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1604     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1605     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1606
1607     if (rra_step_cnt) {
1608         /* If we are in this block, as least 1 CDP value will be written to
1609          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1610          * to be written, then the "fill in" value is the CDP_secondary_val
1611          * entry. */
1612         if (isnan(pdp_temp_val)) {
1613             *cdp_unkn_pdp_cnt += start_pdp_offset;
1614             *cdp_secondary_val = DNAN;
1615         } else {
1616             /* CDP_secondary value is the RRA "fill in" value for intermediary
1617              * CDP data entries. No matter the CF, the value is the same because
1618              * the average, max, min, and last of a list of identical values is
1619              * the same, namely, the value itself. */
1620             *cdp_secondary_val = pdp_temp_val;
1621         }
1622
1623         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1624             *cdp_primary_val = DNAN;
1625             if (current_cf == CF_AVERAGE) {
1626                 *cdp_val =
1627                     initialize_average_carry_over(pdp_temp_val,
1628                                                   elapsed_pdp_st,
1629                                                   start_pdp_offset, pdp_cnt);
1630             } else {
1631                 *cdp_val = pdp_temp_val;
1632             }
1633         } else {
1634             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1635                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1636         }               /* endif meets xff value requirement for a valid value */
1637         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1638          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1639         if (isnan(pdp_temp_val))
1640             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1641         else
1642             *cdp_unkn_pdp_cnt = 0;
1643     } else {            /* rra_step_cnt[i]  == 0 */
1644
1645 #ifdef DEBUG
1646         if (isnan(*cdp_val)) {
1647             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1648                     i, ii);
1649         } else {
1650             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1651                     i, ii, *cdp_val);
1652         }
1653 #endif
1654         if (isnan(pdp_temp_val)) {
1655             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1656         } else {
1657             *cdp_val =
1658                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1659                                   current_cf, i, ii);
1660         }
1661     }
1662 }
1663
1664 /*
1665  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1666  * on the type of consolidation function.
1667  */
1668 static void initialize_cdp_val(
1669     unival *scratch,
1670     int current_cf,
1671     rrd_value_t pdp_temp_val,
1672     unsigned long elapsed_pdp_st,
1673     unsigned long start_pdp_offset,
1674     unsigned long pdp_cnt)
1675 {
1676     rrd_value_t cum_val, cur_val;
1677
1678     switch (current_cf) {
1679     case CF_AVERAGE:
1680         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1681         cur_val = IFDNAN(pdp_temp_val, 0.0);
1682         scratch[CDP_primary_val].u_val =
1683             (cum_val + cur_val * start_pdp_offset) /
1684             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1685         scratch[CDP_val].u_val =
1686             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1687                                           start_pdp_offset, pdp_cnt);
1688         break;
1689     case CF_MAXIMUM:
1690         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1691         cur_val = IFDNAN(pdp_temp_val, -DINF);
1692 #if 0
1693 #ifdef DEBUG
1694         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1695             fprintf(stderr,
1696                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1697                     i, ii);
1698             exit(-1);
1699         }
1700 #endif
1701 #endif
1702         if (cur_val > cum_val)
1703             scratch[CDP_primary_val].u_val = cur_val;
1704         else
1705             scratch[CDP_primary_val].u_val = cum_val;
1706         /* initialize carry over value */
1707         scratch[CDP_val].u_val = pdp_temp_val;
1708         break;
1709     case CF_MINIMUM:
1710         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1711         cur_val = IFDNAN(pdp_temp_val, DINF);
1712 #if 0
1713 #ifdef DEBUG
1714         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1715             fprintf(stderr,
1716                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1717                     ii);
1718             exit(-1);
1719         }
1720 #endif
1721 #endif
1722         if (cur_val < cum_val)
1723             scratch[CDP_primary_val].u_val = cur_val;
1724         else
1725             scratch[CDP_primary_val].u_val = cum_val;
1726         /* initialize carry over value */
1727         scratch[CDP_val].u_val = pdp_temp_val;
1728         break;
1729     case CF_LAST:
1730     default:
1731         scratch[CDP_primary_val].u_val = pdp_temp_val;
1732         /* initialize carry over value */
1733         scratch[CDP_val].u_val = pdp_temp_val;
1734         break;
1735     }
1736 }
1737
1738 /*
1739  * Update the consolidation function for Holt-Winters functions as
1740  * well as other functions that don't actually consolidate multiple
1741  * PDPs.
1742  */
1743 static void reset_cdp(
1744     rrd_t *rrd,
1745     unsigned long elapsed_pdp_st,
1746     rrd_value_t *pdp_temp,
1747     rrd_value_t *last_seasonal_coef,
1748     rrd_value_t *seasonal_coef,
1749     int rra_idx,
1750     int ds_idx,
1751     int cdp_idx,
1752     enum cf_en current_cf)
1753 {
1754     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1755
1756     switch (current_cf) {
1757     case CF_AVERAGE:
1758     default:
1759         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1760         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1761         break;
1762     case CF_SEASONAL:
1763     case CF_DEVSEASONAL:
1764         /* need to update cached seasonal values, so they are consistent
1765          * with the bulk update */
1766         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1767          * CDP_last_deviation are the same. */
1768         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1769         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1770         break;
1771     case CF_HWPREDICT:
1772     case CF_MHWPREDICT:
1773         /* need to update the null_count and last_null_count.
1774          * even do this for non-DNAN pdp_temp because the
1775          * algorithm is not learning from batch updates. */
1776         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1777         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1778         /* fall through */
1779     case CF_DEVPREDICT:
1780         scratch[CDP_primary_val].u_val = DNAN;
1781         scratch[CDP_secondary_val].u_val = DNAN;
1782         break;
1783     case CF_FAILURES:
1784         /* do not count missed bulk values as failures */
1785         scratch[CDP_primary_val].u_val = 0;
1786         scratch[CDP_secondary_val].u_val = 0;
1787         /* need to reset violations buffer.
1788          * could do this more carefully, but for now, just
1789          * assume a bulk update wipes away all violations. */
1790         erase_violations(rrd, cdp_idx, rra_idx);
1791         break;
1792     }
1793 }
1794
1795 static rrd_value_t initialize_average_carry_over(
1796     rrd_value_t pdp_temp_val,
1797     unsigned long elapsed_pdp_st,
1798     unsigned long start_pdp_offset,
1799     unsigned long pdp_cnt)
1800 {
1801     /* initialize carry over value */
1802     if (isnan(pdp_temp_val)) {
1803         return DNAN;
1804     }
1805     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1806 }
1807
1808 /*
1809  * Update or initialize a CDP value based on the consolidation
1810  * function.
1811  *
1812  * Returns the new value.
1813  */
1814 static rrd_value_t calculate_cdp_val(
1815     rrd_value_t cdp_val,
1816     rrd_value_t pdp_temp_val,
1817     unsigned long elapsed_pdp_st,
1818     int current_cf,
1819 #ifdef DEBUG
1820     int i,
1821     int ii
1822 #else
1823     int UNUSED(i),
1824     int UNUSED(ii)
1825 #endif
1826     )
1827 {
1828     if (isnan(cdp_val)) {
1829         if (current_cf == CF_AVERAGE) {
1830             pdp_temp_val *= elapsed_pdp_st;
1831         }
1832 #ifdef DEBUG
1833         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1834                 i, ii, pdp_temp_val);
1835 #endif
1836         return pdp_temp_val;
1837     }
1838     if (current_cf == CF_AVERAGE)
1839         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1840     if (current_cf == CF_MINIMUM)
1841         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1842     if (current_cf == CF_MAXIMUM)
1843         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1844
1845     return pdp_temp_val;
1846 }
1847
1848 /*
1849  * For each RRA, update the seasonal values and then call update_aberrant_CF
1850  * for each data source.
1851  *
1852  * Return 0 on success, -1 on error.
1853  */
1854 static int update_aberrant_cdps(
1855     rrd_t *rrd,
1856     rrd_file_t *rrd_file,
1857     unsigned long rra_begin,
1858     unsigned long *rra_current,
1859     unsigned long elapsed_pdp_st,
1860     rrd_value_t *pdp_temp,
1861     rrd_value_t **seasonal_coef)
1862 {
1863     unsigned long rra_idx, ds_idx, j;
1864
1865     /* number of PDP steps since the last update that
1866      * are assigned to the first CDP to be generated
1867      * since the last update. */
1868     unsigned short scratch_idx;
1869     unsigned long rra_start;
1870     enum cf_en current_cf;
1871
1872     /* this loop is only entered if elapsed_pdp_st < 3 */
1873     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1874          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1875         rra_start = rra_begin;
1876         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1877             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1878                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1879                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1880                     if (scratch_idx == CDP_primary_val) {
1881                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1882                                         elapsed_pdp_st + 1, seasonal_coef);
1883                     } else {
1884                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1885                                         elapsed_pdp_st + 2, seasonal_coef);
1886                     }
1887                     *rra_current = rrd_tell(rrd_file);
1888                 }
1889                 if (rrd_test_error())
1890                     return -1;
1891                 /* loop over data soures within each RRA */
1892                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1893                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1894                                        rra_idx * (rrd->stat_head->ds_cnt) +
1895                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1896                                        *seasonal_coef);
1897                 }
1898             }
1899             rra_start += rrd->rra_def[rra_idx].row_cnt
1900                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1901         }
1902     }
1903     return 0;
1904 }
1905
1906 /* 
1907  * Move sequentially through the file, writing one RRA at a time.  Note this
1908  * architecture divorces the computation of CDP with flushing updated RRA
1909  * entries to disk.
1910  *
1911  * Return 0 on success, -1 on error.
1912  */
1913 static int write_to_rras(
1914     rrd_t *rrd,
1915     rrd_file_t *rrd_file,
1916     unsigned long *rra_step_cnt,
1917     unsigned long rra_begin,
1918     unsigned long *rra_current,
1919     time_t current_time,
1920     unsigned long *skip_update,
1921     rrd_info_t ** pcdp_summary)
1922 {
1923     unsigned long rra_idx;
1924     unsigned long rra_start;
1925     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1926     time_t    rra_time = 0; /* time of update for a RRA */
1927
1928     /* Ready to write to disk */
1929     rra_start = rra_begin;
1930     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1931         /* skip unless there's something to write */
1932         if (rra_step_cnt[rra_idx]) {
1933             /* write the first row */
1934 #ifdef DEBUG
1935             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1936 #endif
1937             rrd->rra_ptr[rra_idx].cur_row++;
1938             if (rrd->rra_ptr[rra_idx].cur_row >=
1939                 rrd->rra_def[rra_idx].row_cnt)
1940                 rrd->rra_ptr[rra_idx].cur_row = 0;  /* wrap around */
1941             /* position on the first row */
1942             rra_pos_tmp = rra_start +
1943                 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1944                 sizeof(rrd_value_t);
1945             if (rra_pos_tmp != *rra_current) {
1946                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1947                     rrd_set_error("seek error in rrd");
1948                     return -1;
1949                 }
1950                 *rra_current = rra_pos_tmp;
1951             }
1952 #ifdef DEBUG
1953             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1954 #endif
1955             if (!skip_update[rra_idx]) {
1956                 if (*pcdp_summary != NULL) {
1957                     rra_time = (current_time - current_time
1958                                 % (rrd->rra_def[rra_idx].pdp_cnt *
1959                                    rrd->stat_head->pdp_step))
1960                         -
1961                         ((rra_step_cnt[rra_idx] -
1962                           1) * rrd->rra_def[rra_idx].pdp_cnt *
1963                          rrd->stat_head->pdp_step);
1964                 }
1965                 if (write_RRA_row
1966                     (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1967                      pcdp_summary, rra_time) == -1)
1968                     return -1;
1969             }
1970
1971             /* write other rows of the bulk update, if any */
1972             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1973                 if (++rrd->rra_ptr[rra_idx].cur_row ==
1974                     rrd->rra_def[rra_idx].row_cnt) {
1975 #ifdef DEBUG
1976                     fprintf(stderr,
1977                             "Wraparound for RRA %s, %lu updates left\n",
1978                             rrd->rra_def[rra_idx].cf_nam,
1979                             rra_step_cnt[rra_idx] - 1);
1980 #endif
1981                     /* wrap */
1982                     rrd->rra_ptr[rra_idx].cur_row = 0;
1983                     /* seek back to beginning of current rra */
1984                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1985                         rrd_set_error("seek error in rrd");
1986                         return -1;
1987                     }
1988 #ifdef DEBUG
1989                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1990                             rrd_file->pos);
1991 #endif
1992                     *rra_current = rra_start;
1993                 }
1994                 if (!skip_update[rra_idx]) {
1995                     if (*pcdp_summary != NULL) {
1996                         rra_time = (current_time - current_time
1997                                     % (rrd->rra_def[rra_idx].pdp_cnt *
1998                                        rrd->stat_head->pdp_step))
1999                             -
2000                             ((rra_step_cnt[rra_idx] -
2001                               2) * rrd->rra_def[rra_idx].pdp_cnt *
2002                              rrd->stat_head->pdp_step);
2003                     }
2004                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
2005                                       CDP_secondary_val, pcdp_summary,
2006                                       rra_time) == -1)
2007                         return -1;
2008                 }
2009             }
2010         }
2011         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
2012             sizeof(rrd_value_t);
2013     }                   /* RRA LOOP */
2014
2015     return 0;
2016 }
2017
2018 /*
2019  * Write out one row of values (one value per DS) to the archive.
2020  *
2021  * Returns 0 on success, -1 on error.
2022  */
2023 static int write_RRA_row(
2024     rrd_file_t *rrd_file,
2025     rrd_t *rrd,
2026     unsigned long rra_idx,
2027     unsigned long *rra_current,
2028     unsigned short CDP_scratch_idx,
2029     rrd_info_t ** pcdp_summary,
2030     time_t rra_time)
2031 {
2032     unsigned long ds_idx, cdp_idx;
2033     rrd_infoval_t iv;
2034
2035     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2036         /* compute the cdp index */
2037         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2038 #ifdef DEBUG
2039         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2040                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2041                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2042 #endif
2043         if (*pcdp_summary != NULL) {
2044             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2045             /* append info to the return hash */
2046             *pcdp_summary = rrd_info_push(*pcdp_summary,
2047                                           sprintf_alloc
2048                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2049                                            rrd->rra_def[rra_idx].cf_nam,
2050                                            rrd->rra_def[rra_idx].pdp_cnt,
2051                                            rrd->ds_def[ds_idx].ds_nam),
2052                                           RD_I_VAL, iv);
2053         }
2054         if (rrd_write(rrd_file,
2055                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2056                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2057             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2058             return -1;
2059         }
2060         *rra_current += sizeof(rrd_value_t);
2061     }
2062     return 0;
2063 }
2064
2065 /*
2066  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2067  *
2068  * Returns 0 on success, -1 otherwise
2069  */
2070 static int smooth_all_rras(
2071     rrd_t *rrd,
2072     rrd_file_t *rrd_file,
2073     unsigned long rra_begin)
2074 {
2075     unsigned long rra_start = rra_begin;
2076     unsigned long rra_idx;
2077
2078     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2079         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2080             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2081 #ifdef DEBUG
2082             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2083 #endif
2084             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2085             if (rrd_test_error())
2086                 return -1;
2087         }
2088         rra_start += rrd->rra_def[rra_idx].row_cnt
2089             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2090     }
2091     return 0;
2092 }
2093
2094 #ifndef HAVE_MMAP
2095 /*
2096  * Flush changes to disk (unless we're using mmap)
2097  *
2098  * Returns 0 on success, -1 otherwise
2099  */
2100 static int write_changes_to_disk(
2101     rrd_t *rrd,
2102     rrd_file_t *rrd_file,
2103     int version)
2104 {
2105     /* we just need to write back the live header portion now */
2106     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2107                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2108                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2109                  SEEK_SET) != 0) {
2110         rrd_set_error("seek rrd for live header writeback");
2111         return -1;
2112     }
2113     if (version >= 3) {
2114         if (rrd_write(rrd_file, rrd->live_head,
2115                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2116             rrd_set_error("rrd_write live_head to rrd");
2117             return -1;
2118         }
2119     } else {
2120         if (rrd_write(rrd_file, rrd->legacy_last_up,
2121                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2122             rrd_set_error("rrd_write live_head to rrd");
2123             return -1;
2124         }
2125     }
2126
2127
2128     if (rrd_write(rrd_file, rrd->pdp_prep,
2129                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2130         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2131         rrd_set_error("rrd_write pdp_prep to rrd");
2132         return -1;
2133     }
2134
2135     if (rrd_write(rrd_file, rrd->cdp_prep,
2136                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2137                   rrd->stat_head->ds_cnt)
2138         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2139                       rrd->stat_head->ds_cnt)) {
2140
2141         rrd_set_error("rrd_write cdp_prep to rrd");
2142         return -1;
2143     }
2144
2145     if (rrd_write(rrd_file, rrd->rra_ptr,
2146                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2147         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2148         rrd_set_error("rrd_write rra_ptr to rrd");
2149         return -1;
2150     }
2151     return 0;
2152 }
2153 #endif