d978822ee1198dba9da1a635b073a37e6181f4a4
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.0  Copyright by Tobi Oetiker, 1997-2008
4  *                Copyright by Florian Forster, 2008
5  *****************************************************************************
6  * rrd_update.c  RRD Update Function
7  *****************************************************************************
8  * $Id$
9  *****************************************************************************/
10
11 #include "rrd_tool.h"
12
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
18
19 #include <locale.h>
20
21 #include "rrd_hw.h"
22 #include "rrd_rpncalc.h"
23
24 #include "rrd_is_thread_safe.h"
25 #include "unused.h"
26
27 #include "rrd_client.h"
28
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 /*
31  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
32  * replacement.
33  */
34 #include <sys/timeb.h>
35
36 #ifndef __MINGW32__
37 struct timeval {
38     time_t    tv_sec;   /* seconds */
39     long      tv_usec;  /* microseconds */
40 };
41 #endif
42
43 struct __timezone {
44     int       tz_minuteswest;   /* minutes W of Greenwich */
45     int       tz_dsttime;   /* type of dst correction */
46 };
47
48 static int gettimeofday(
49     struct timeval *t,
50     struct __timezone *tz)
51 {
52
53     struct _timeb current_time;
54
55     _ftime(&current_time);
56
57     t->tv_sec = current_time.time;
58     t->tv_usec = current_time.millitm * 1000;
59
60     return 0;
61 }
62
63 #endif
64
65 /* FUNCTION PROTOTYPES */
66
67 int       rrd_update_r(
68     const char *filename,
69     const char *tmplt,
70     int argc,
71     const char **argv);
72 int       _rrd_update(
73     const char *filename,
74     const char *tmplt,
75     int argc,
76     const char **argv,
77     rrd_info_t *);
78
79 static int allocate_data_structures(
80     rrd_t *rrd,
81     char ***updvals,
82     rrd_value_t **pdp_temp,
83     const char *tmplt,
84     long **tmpl_idx,
85     unsigned long *tmpl_cnt,
86     unsigned long **rra_step_cnt,
87     unsigned long **skip_update,
88     rrd_value_t **pdp_new);
89
90 static int parse_template(
91     rrd_t *rrd,
92     const char *tmplt,
93     unsigned long *tmpl_cnt,
94     long *tmpl_idx);
95
96 static int process_arg(
97     char *step_start,
98     rrd_t *rrd,
99     rrd_file_t *rrd_file,
100     unsigned long rra_begin,
101     unsigned long *rra_current,
102     time_t *current_time,
103     unsigned long *current_time_usec,
104     rrd_value_t *pdp_temp,
105     rrd_value_t *pdp_new,
106     unsigned long *rra_step_cnt,
107     char **updvals,
108     long *tmpl_idx,
109     unsigned long tmpl_cnt,
110     rrd_info_t ** pcdp_summary,
111     int version,
112     unsigned long *skip_update,
113     int *schedule_smooth);
114
115 static int parse_ds(
116     rrd_t *rrd,
117     char **updvals,
118     long *tmpl_idx,
119     char *input,
120     unsigned long tmpl_cnt,
121     time_t *current_time,
122     unsigned long *current_time_usec,
123     int version);
124
125 static int get_time_from_reading(
126     rrd_t *rrd,
127     char timesyntax,
128     char **updvals,
129     time_t *current_time,
130     unsigned long *current_time_usec,
131     int version);
132
133 static int update_pdp_prep(
134     rrd_t *rrd,
135     char **updvals,
136     rrd_value_t *pdp_new,
137     double interval);
138
139 static int calculate_elapsed_steps(
140     rrd_t *rrd,
141     unsigned long current_time,
142     unsigned long current_time_usec,
143     double interval,
144     double *pre_int,
145     double *post_int,
146     unsigned long *proc_pdp_cnt);
147
148 static void simple_update(
149     rrd_t *rrd,
150     double interval,
151     rrd_value_t *pdp_new);
152
153 static int process_all_pdp_st(
154     rrd_t *rrd,
155     double interval,
156     double pre_int,
157     double post_int,
158     unsigned long elapsed_pdp_st,
159     rrd_value_t *pdp_new,
160     rrd_value_t *pdp_temp);
161
162 static int process_pdp_st(
163     rrd_t *rrd,
164     unsigned long ds_idx,
165     double interval,
166     double pre_int,
167     double post_int,
168     long diff_pdp_st,
169     rrd_value_t *pdp_new,
170     rrd_value_t *pdp_temp);
171
172 static int update_all_cdp_prep(
173     rrd_t *rrd,
174     unsigned long *rra_step_cnt,
175     unsigned long rra_begin,
176     rrd_file_t *rrd_file,
177     unsigned long elapsed_pdp_st,
178     unsigned long proc_pdp_cnt,
179     rrd_value_t **last_seasonal_coef,
180     rrd_value_t **seasonal_coef,
181     rrd_value_t *pdp_temp,
182     unsigned long *rra_current,
183     unsigned long *skip_update,
184     int *schedule_smooth);
185
186 static int do_schedule_smooth(
187     rrd_t *rrd,
188     unsigned long rra_idx,
189     unsigned long elapsed_pdp_st);
190
191 static int update_cdp_prep(
192     rrd_t *rrd,
193     unsigned long elapsed_pdp_st,
194     unsigned long start_pdp_offset,
195     unsigned long *rra_step_cnt,
196     int rra_idx,
197     rrd_value_t *pdp_temp,
198     rrd_value_t *last_seasonal_coef,
199     rrd_value_t *seasonal_coef,
200     int current_cf);
201
202 static void update_cdp(
203     unival *scratch,
204     int current_cf,
205     rrd_value_t pdp_temp_val,
206     unsigned long rra_step_cnt,
207     unsigned long elapsed_pdp_st,
208     unsigned long start_pdp_offset,
209     unsigned long pdp_cnt,
210     rrd_value_t xff,
211     int i,
212     int ii);
213
214 static void initialize_cdp_val(
215     unival *scratch,
216     int current_cf,
217     rrd_value_t pdp_temp_val,
218     unsigned long elapsed_pdp_st,
219     unsigned long start_pdp_offset,
220     unsigned long pdp_cnt);
221
222 static void reset_cdp(
223     rrd_t *rrd,
224     unsigned long elapsed_pdp_st,
225     rrd_value_t *pdp_temp,
226     rrd_value_t *last_seasonal_coef,
227     rrd_value_t *seasonal_coef,
228     int rra_idx,
229     int ds_idx,
230     int cdp_idx,
231     enum cf_en current_cf);
232
233 static rrd_value_t initialize_average_carry_over(
234     rrd_value_t pdp_temp_val,
235     unsigned long elapsed_pdp_st,
236     unsigned long start_pdp_offset,
237     unsigned long pdp_cnt);
238
239 static rrd_value_t calculate_cdp_val(
240     rrd_value_t cdp_val,
241     rrd_value_t pdp_temp_val,
242     unsigned long elapsed_pdp_st,
243     int current_cf,
244     int i,
245     int ii);
246
247 static int update_aberrant_cdps(
248     rrd_t *rrd,
249     rrd_file_t *rrd_file,
250     unsigned long rra_begin,
251     unsigned long *rra_current,
252     unsigned long elapsed_pdp_st,
253     rrd_value_t *pdp_temp,
254     rrd_value_t **seasonal_coef);
255
256 static int write_to_rras(
257     rrd_t *rrd,
258     rrd_file_t *rrd_file,
259     unsigned long *rra_step_cnt,
260     unsigned long rra_begin,
261     unsigned long *rra_current,
262     time_t current_time,
263     unsigned long *skip_update,
264     rrd_info_t ** pcdp_summary);
265
266 static int write_RRA_row(
267     rrd_file_t *rrd_file,
268     rrd_t *rrd,
269     unsigned long rra_idx,
270     unsigned long *rra_current,
271     unsigned short CDP_scratch_idx,
272     rrd_info_t ** pcdp_summary,
273     time_t rra_time);
274
275 static int smooth_all_rras(
276     rrd_t *rrd,
277     rrd_file_t *rrd_file,
278     unsigned long rra_begin);
279
280 #ifndef HAVE_MMAP
281 static int write_changes_to_disk(
282     rrd_t *rrd,
283     rrd_file_t *rrd_file,
284     int version);
285 #endif
286
287 /*
288  * normalize time as returned by gettimeofday. usec part must
289  * be always >= 0
290  */
291 static inline void normalize_time(
292     struct timeval *t)
293 {
294     if (t->tv_usec < 0) {
295         t->tv_sec--;
296         t->tv_usec += 1e6L;
297     }
298 }
299
300 /*
301  * Sets current_time and current_time_usec based on the current time.
302  * current_time_usec is set to 0 if the version number is 1 or 2.
303  */
304 static inline void initialize_time(
305     time_t *current_time,
306     unsigned long *current_time_usec,
307     int version)
308 {
309     struct timeval tmp_time;    /* used for time conversion */
310
311     gettimeofday(&tmp_time, 0);
312     normalize_time(&tmp_time);
313     *current_time = tmp_time.tv_sec;
314     if (version >= 3) {
315         *current_time_usec = tmp_time.tv_usec;
316     } else {
317         *current_time_usec = 0;
318     }
319 }
320
321 static int send_values_to_daemon (const char *addr, const char *file,
322         int values_num, const char * const *values, int silent)
323 {
324     int status;
325
326     status = rrdc_connect ((addr != NULL) ? addr : RRDD_SOCK_PATH);
327     if (status != 0)
328     {
329         if (!silent)
330         {
331             rrd_set_error("Unable to connect to daemon: %s",
332                     (status < 0)
333                     ? "Internal error"
334                     : rrd_strerror (status));
335         }
336         return (status);
337     }
338
339     status = rrdc_update (file, values_num, values);
340     if (status != 0)
341     {
342         if (!silent)
343         {
344             rrd_set_error("Failed sending the values to the daemon: %s",
345                     (status < 0)
346                     ? "Internal error"
347                     : rrd_strerror (status));
348         }
349         rrdc_disconnect ();
350         return (status);
351     }
352
353     rrdc_disconnect ();
354     return (0);
355 } /* int send_values_to_daemon */
356
357 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
358
359 rrd_info_t *rrd_update_v(
360     int argc,
361     char **argv)
362 {
363     char     *tmplt = NULL;
364     rrd_info_t *result = NULL;
365     rrd_infoval_t rc;
366     struct option long_options[] = {
367         {"template", required_argument, 0, 't'},
368         {0, 0, 0, 0}
369     };
370
371     rc.u_int = -1;
372     optind = 0;
373     opterr = 0;         /* initialize getopt */
374
375     while (1) {
376         int       option_index = 0;
377         int       opt;
378
379         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
380
381         if (opt == EOF)
382             break;
383
384         switch (opt) {
385         case 't':
386             tmplt = optarg;
387             break;
388
389         case '?':
390             rrd_set_error("unknown option '%s'", argv[optind - 1]);
391             goto end_tag;
392         }
393     }
394
395     /* need at least 2 arguments: filename, data. */
396     if (argc - optind < 2) {
397         rrd_set_error("Not enough arguments");
398         goto end_tag;
399     }
400     rc.u_int = 0;
401     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
402     rc.u_int = _rrd_update(argv[optind], tmplt,
403                            argc - optind - 1,
404                            (const char **) (argv + optind + 1), result);
405     result->value.u_int = rc.u_int;
406   end_tag:
407     return result;
408 }
409
410 int rrd_update(
411     int argc,
412     char **argv)
413 {
414     struct option long_options[] = {
415         {"template", required_argument, 0, 't'},
416         {"cache",    optional_argument, 0, 'c'},
417         {"nocache",  no_argument      , 0, 'n'},
418         {"daemon",   required_argument, 0, 'd'},
419         {0, 0, 0, 0}
420     };
421     int       option_index = 0;
422     int       opt;
423     char     *tmplt = NULL;
424     int       rc = -1;
425     /* force_cache: 0 = default; -1 = force no cache; 1 = force cache */
426     int       force_cache = 0;
427     char     *daemon_address = NULL;
428
429     optind = 0;
430     opterr = 0;         /* initialize getopt */
431
432     while (1) {
433         opt = getopt_long(argc, argv, "t:cnd:", long_options, &option_index);
434
435         if (opt == EOF)
436             break;
437
438         switch (opt) {
439         case 't':
440             tmplt = strdup(optarg);
441             break;
442
443         case 'c':
444             if (optarg != NULL)
445             {
446                 if (strcmp ("no", optarg) == 0)
447                     force_cache = -1;
448                 else if (strcmp ("yes", optarg) == 0)
449                     force_cache = 1;
450                 else
451                 {
452                     rrd_set_error ("option 'cache' must be "
453                             "either 'yes' or 'no'.");
454                     goto out;
455                 }
456             }
457             else
458                 force_cache = 1;
459             break;
460
461         case 'n':
462             force_cache = -1;
463             break;
464
465         case 'd':
466             if (daemon_address != NULL)
467                 free (daemon_address);
468             daemon_address = strdup (optarg);
469             if (daemon_address == NULL)
470             {
471                 rrd_set_error("strdup failed.");
472                 goto out;
473             }
474             break;
475
476         case '?':
477             rrd_set_error("unknown option '%s'", argv[optind - 1]);
478             goto out;
479         }
480     }
481
482     /* need at least 2 arguments: filename, data. */
483     if (argc - optind < 2) {
484         rrd_set_error("Not enough arguments");
485         goto out;
486     }
487
488     if ((tmplt != NULL) && (force_cache > 0))
489     {
490         rrd_set_error("The caching daemon cannot be used together with "
491                 "templates yet.");
492         goto out;
493     }
494
495     if ((tmplt == NULL) && (force_cache >= 0))
496     {
497         int status;
498
499         status = send_values_to_daemon (daemon_address,
500                 /* file = */ argv[optind],
501                 /* values_num = */ argc - optind - 1,
502                 /* values = */ (void *) (argv + optind + 1),
503                 /* silent = */ (force_cache > 0) ? 0 : 1);
504         if ((status == 0) || (force_cache > 0))
505             goto out;
506     } /* if ((tmplt != NULL) && (force_cache >= 0)) */
507
508     rc = rrd_update_r(argv[optind], tmplt,
509                       argc - optind - 1, (const char **) (argv + optind + 1));
510   out:
511     if (tmplt != NULL)
512     {
513         free(tmplt);
514         tmplt = NULL;
515     }
516     if (daemon_address != NULL)
517     {
518         free (daemon_address);
519         daemon_address = NULL;
520     }
521     return rc;
522 }
523
524 int rrd_update_r(
525     const char *filename,
526     const char *tmplt,
527     int argc,
528     const char **argv)
529 {
530     return _rrd_update(filename, tmplt, argc, argv, NULL);
531 }
532
533 int _rrd_update(
534     const char *filename,
535     const char *tmplt,
536     int argc,
537     const char **argv,
538     rrd_info_t * pcdp_summary)
539 {
540
541     int       arg_i = 2;
542
543     unsigned long rra_begin;    /* byte pointer to the rra
544                                  * area in the rrd file.  this
545                                  * pointer never changes value */
546     unsigned long rra_current;  /* byte pointer to the current write
547                                  * spot in the rrd file. */
548     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
549                              * to the existing entry */
550     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
551                              * to the cdp values */
552
553     long     *tmpl_idx; /* index representing the settings
554                          * transported by the tmplt index */
555     unsigned long tmpl_cnt = 2; /* time and data */
556     rrd_t     rrd;
557     time_t    current_time = 0;
558     unsigned long current_time_usec = 0;    /* microseconds part of current time */
559     char    **updvals;
560     int       schedule_smooth = 0;
561
562     /* number of elapsed PDP steps since last update */
563     unsigned long *rra_step_cnt = NULL;
564
565     int       version;  /* rrd version */
566     rrd_file_t *rrd_file;
567     char     *arg_copy; /* for processing the argv */
568     unsigned long *skip_update; /* RRAs to advance but not write */
569
570     /* need at least 1 arguments: data. */
571     if (argc < 1) {
572         rrd_set_error("Not enough arguments");
573         goto err_out;
574     }
575
576     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
577         goto err_free;
578     }
579     /* We are now at the beginning of the rra's */
580     rra_current = rra_begin = rrd_file->header_len;
581
582     version = atoi(rrd.stat_head->version);
583
584     initialize_time(&current_time, &current_time_usec, version);
585
586     /* get exclusive lock to whole file.
587      * lock gets removed when we close the file.
588      */
589     if (rrd_lock(rrd_file) != 0) {
590         rrd_set_error("could not lock RRD");
591         goto err_close;
592     }
593
594     if (allocate_data_structures(&rrd, &updvals,
595                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
596                                  &rra_step_cnt, &skip_update,
597                                  &pdp_new) == -1) {
598         goto err_close;
599     }
600
601     /* loop through the arguments. */
602     for (arg_i = 0; arg_i < argc; arg_i++) {
603         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
604             rrd_set_error("failed duplication argv entry");
605             break;
606         }
607         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
608                         &current_time, &current_time_usec, pdp_temp, pdp_new,
609                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
610                         &pcdp_summary, version, skip_update,
611                         &schedule_smooth) == -1) {
612             free(arg_copy);
613             break;
614         }
615         free(arg_copy);
616     }
617
618     free(rra_step_cnt);
619
620     /* if we got here and if there is an error and if the file has not been
621      * written to, then close things up and return. */
622     if (rrd_test_error()) {
623         goto err_free_structures;
624     }
625 #ifndef HAVE_MMAP
626     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
627         goto err_free_structures;
628     }
629 #endif
630
631     /* calling the smoothing code here guarantees at most one smoothing
632      * operation per rrd_update call. Unfortunately, it is possible with bulk
633      * updates, or a long-delayed update for smoothing to occur off-schedule.
634      * This really isn't critical except during the burn-in cycles. */
635     if (schedule_smooth) {
636         smooth_all_rras(&rrd, rrd_file, rra_begin);
637     }
638
639 /*    rrd_dontneed(rrd_file,&rrd); */
640     rrd_free(&rrd);
641     rrd_close(rrd_file);
642
643     free(pdp_new);
644     free(tmpl_idx);
645     free(pdp_temp);
646     free(skip_update);
647     free(updvals);
648     return 0;
649
650   err_free_structures:
651     free(pdp_new);
652     free(tmpl_idx);
653     free(pdp_temp);
654     free(skip_update);
655     free(updvals);
656   err_close:
657     rrd_close(rrd_file);
658   err_free:
659     rrd_free(&rrd);
660   err_out:
661     return -1;
662 }
663
664 /*
665  * get exclusive lock to whole file.
666  * lock gets removed when we close the file
667  *
668  * returns 0 on success
669  */
670 int rrd_lock(
671     rrd_file_t *file)
672 {
673     int       rcstat;
674
675     {
676 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
677         struct _stat st;
678
679         if (_fstat(file->fd, &st) == 0) {
680             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
681         } else {
682             rcstat = -1;
683         }
684 #else
685         struct flock lock;
686
687         lock.l_type = F_WRLCK;  /* exclusive write lock */
688         lock.l_len = 0; /* whole file */
689         lock.l_start = 0;   /* start of file */
690         lock.l_whence = SEEK_SET;   /* end of file */
691
692         rcstat = fcntl(file->fd, F_SETLK, &lock);
693 #endif
694     }
695
696     return (rcstat);
697 }
698
699 /*
700  * Allocate some important arrays used, and initialize the template.
701  *
702  * When it returns, either all of the structures are allocated
703  * or none of them are.
704  *
705  * Returns 0 on success, -1 on error.
706  */
707 static int allocate_data_structures(
708     rrd_t *rrd,
709     char ***updvals,
710     rrd_value_t **pdp_temp,
711     const char *tmplt,
712     long **tmpl_idx,
713     unsigned long *tmpl_cnt,
714     unsigned long **rra_step_cnt,
715     unsigned long **skip_update,
716     rrd_value_t **pdp_new)
717 {
718     unsigned  i, ii;
719     if ((*updvals = (char **) malloc(sizeof(char *)
720                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
721         rrd_set_error("allocating updvals pointer array.");
722         return -1;
723     }
724     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
725                                             * rrd->stat_head->ds_cnt)) ==
726         NULL) {
727         rrd_set_error("allocating pdp_temp.");
728         goto err_free_updvals;
729     }
730     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
731                                                  *
732                                                  rrd->stat_head->rra_cnt)) ==
733         NULL) {
734         rrd_set_error("allocating skip_update.");
735         goto err_free_pdp_temp;
736     }
737     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
738                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
739         rrd_set_error("allocating tmpl_idx.");
740         goto err_free_skip_update;
741     }
742     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
743                                                   *
744                                                   (rrd->stat_head->
745                                                    rra_cnt))) == NULL) {
746         rrd_set_error("allocating rra_step_cnt.");
747         goto err_free_tmpl_idx;
748     }
749
750     /* initialize tmplt redirector */
751     /* default config example (assume DS 1 is a CDEF DS)
752        tmpl_idx[0] -> 0; (time)
753        tmpl_idx[1] -> 1; (DS 0)
754        tmpl_idx[2] -> 3; (DS 2)
755        tmpl_idx[3] -> 4; (DS 3) */
756     (*tmpl_idx)[0] = 0; /* time */
757     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
758         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
759             (*tmpl_idx)[ii++] = i;
760     }
761     *tmpl_cnt = ii;
762
763     if (tmplt != NULL) {
764         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
765             goto err_free_rra_step_cnt;
766         }
767     }
768
769     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
770                                            * rrd->stat_head->ds_cnt)) == NULL) {
771         rrd_set_error("allocating pdp_new.");
772         goto err_free_rra_step_cnt;
773     }
774
775     return 0;
776
777   err_free_rra_step_cnt:
778     free(*rra_step_cnt);
779   err_free_tmpl_idx:
780     free(*tmpl_idx);
781   err_free_skip_update:
782     free(*skip_update);
783   err_free_pdp_temp:
784     free(*pdp_temp);
785   err_free_updvals:
786     free(*updvals);
787     return -1;
788 }
789
790 /*
791  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
792  *
793  * Returns 0 on success.
794  */
795 static int parse_template(
796     rrd_t *rrd,
797     const char *tmplt,
798     unsigned long *tmpl_cnt,
799     long *tmpl_idx)
800 {
801     char     *dsname, *tmplt_copy;
802     unsigned int tmpl_len, i;
803     int       ret = 0;
804
805     *tmpl_cnt = 1;      /* the first entry is the time */
806
807     /* we should work on a writeable copy here */
808     if ((tmplt_copy = strdup(tmplt)) == NULL) {
809         rrd_set_error("error copying tmplt '%s'", tmplt);
810         ret = -1;
811         goto out;
812     }
813
814     dsname = tmplt_copy;
815     tmpl_len = strlen(tmplt_copy);
816     for (i = 0; i <= tmpl_len; i++) {
817         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
818             tmplt_copy[i] = '\0';
819             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
820                 rrd_set_error("tmplt contains more DS definitions than RRD");
821                 ret = -1;
822                 goto out_free_tmpl_copy;
823             }
824             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
825                 rrd_set_error("unknown DS name '%s'", dsname);
826                 ret = -1;
827                 goto out_free_tmpl_copy;
828             }
829             /* go to the next entry on the tmplt_copy */
830             if (i < tmpl_len)
831                 dsname = &tmplt_copy[i + 1];
832         }
833     }
834   out_free_tmpl_copy:
835     free(tmplt_copy);
836   out:
837     return ret;
838 }
839
840 /*
841  * Parse an update string, updates the primary data points (PDPs)
842  * and consolidated data points (CDPs), and writes changes to the RRAs.
843  *
844  * Returns 0 on success, -1 on error.
845  */
846 static int process_arg(
847     char *step_start,
848     rrd_t *rrd,
849     rrd_file_t *rrd_file,
850     unsigned long rra_begin,
851     unsigned long *rra_current,
852     time_t *current_time,
853     unsigned long *current_time_usec,
854     rrd_value_t *pdp_temp,
855     rrd_value_t *pdp_new,
856     unsigned long *rra_step_cnt,
857     char **updvals,
858     long *tmpl_idx,
859     unsigned long tmpl_cnt,
860     rrd_info_t ** pcdp_summary,
861     int version,
862     unsigned long *skip_update,
863     int *schedule_smooth)
864 {
865     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
866
867     /* a vector of future Holt-Winters seasonal coefs */
868     unsigned long elapsed_pdp_st;
869
870     double    interval, pre_int, post_int;  /* interval between this and
871                                              * the last run */
872     unsigned long proc_pdp_cnt;
873     unsigned long rra_start;
874
875     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
876                  current_time, current_time_usec, version) == -1) {
877         return -1;
878     }
879     /* seek to the beginning of the rra's */
880     if (*rra_current != rra_begin) {
881 #ifndef HAVE_MMAP
882         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
883             rrd_set_error("seek error in rrd");
884             return -1;
885         }
886 #endif
887         *rra_current = rra_begin;
888     }
889     rra_start = rra_begin;
890
891     interval = (double) (*current_time - rrd->live_head->last_up)
892         + (double) ((long) *current_time_usec -
893                     (long) rrd->live_head->last_up_usec) / 1e6f;
894
895     /* process the data sources and update the pdp_prep 
896      * area accordingly */
897     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
898         return -1;
899     }
900
901     elapsed_pdp_st = calculate_elapsed_steps(rrd,
902                                              *current_time,
903                                              *current_time_usec, interval,
904                                              &pre_int, &post_int,
905                                              &proc_pdp_cnt);
906
907     /* has a pdp_st moment occurred since the last run ? */
908     if (elapsed_pdp_st == 0) {
909         /* no we have not passed a pdp_st moment. therefore update is simple */
910         simple_update(rrd, interval, pdp_new);
911     } else {
912         /* an pdp_st has occurred. */
913         if (process_all_pdp_st(rrd, interval,
914                                pre_int, post_int,
915                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
916             return -1;
917         }
918         if (update_all_cdp_prep(rrd, rra_step_cnt,
919                                 rra_begin, rrd_file,
920                                 elapsed_pdp_st,
921                                 proc_pdp_cnt,
922                                 &last_seasonal_coef,
923                                 &seasonal_coef,
924                                 pdp_temp, rra_current,
925                                 skip_update, schedule_smooth) == -1) {
926             goto err_free_coefficients;
927         }
928         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
929                                  elapsed_pdp_st, pdp_temp,
930                                  &seasonal_coef) == -1) {
931             goto err_free_coefficients;
932         }
933         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
934                           rra_current, *current_time, skip_update,
935                           pcdp_summary) == -1) {
936             goto err_free_coefficients;
937         }
938     }                   /* endif a pdp_st has occurred */
939     rrd->live_head->last_up = *current_time;
940     rrd->live_head->last_up_usec = *current_time_usec;
941
942     if (version < 3) {
943         *rrd->legacy_last_up = rrd->live_head->last_up;
944     }
945     free(seasonal_coef);
946     free(last_seasonal_coef);
947     return 0;
948
949   err_free_coefficients:
950     free(seasonal_coef);
951     free(last_seasonal_coef);
952     return -1;
953 }
954
955 /*
956  * Parse a DS string (time + colon-separated values), storing the
957  * results in current_time, current_time_usec, and updvals.
958  *
959  * Returns 0 on success, -1 on error.
960  */
961 static int parse_ds(
962     rrd_t *rrd,
963     char **updvals,
964     long *tmpl_idx,
965     char *input,
966     unsigned long tmpl_cnt,
967     time_t *current_time,
968     unsigned long *current_time_usec,
969     int version)
970 {
971     char     *p;
972     unsigned long i;
973     char      timesyntax;
974
975     updvals[0] = input;
976     /* initialize all ds input to unknown except the first one
977        which has always got to be set */
978     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
979         updvals[i] = "U";
980
981     /* separate all ds elements; first must be examined separately
982        due to alternate time syntax */
983     if ((p = strchr(input, '@')) != NULL) {
984         timesyntax = '@';
985     } else if ((p = strchr(input, ':')) != NULL) {
986         timesyntax = ':';
987     } else {
988         rrd_set_error("expected timestamp not found in data source from %s",
989                       input);
990         return -1;
991     }
992     *p = '\0';
993     i = 1;
994     updvals[tmpl_idx[i++]] = p + 1;
995     while (*(++p)) {
996         if (*p == ':') {
997             *p = '\0';
998             if (i < tmpl_cnt) {
999                 updvals[tmpl_idx[i++]] = p + 1;
1000             }
1001         }
1002     }
1003
1004     if (i != tmpl_cnt) {
1005         rrd_set_error("expected %lu data source readings (got %lu) from %s",
1006                       tmpl_cnt - 1, i, input);
1007         return -1;
1008     }
1009
1010     if (get_time_from_reading(rrd, timesyntax, updvals,
1011                               current_time, current_time_usec,
1012                               version) == -1) {
1013         return -1;
1014     }
1015     return 0;
1016 }
1017
1018 /*
1019  * Parse the time in a DS string, store it in current_time and 
1020  * current_time_usec and verify that it's later than the last
1021  * update for this DS.
1022  *
1023  * Returns 0 on success, -1 on error.
1024  */
1025 static int get_time_from_reading(
1026     rrd_t *rrd,
1027     char timesyntax,
1028     char **updvals,
1029     time_t *current_time,
1030     unsigned long *current_time_usec,
1031     int version)
1032 {
1033     double    tmp;
1034     char     *parsetime_error = NULL;
1035     char     *old_locale;
1036     rrd_time_value_t ds_tv;
1037     struct timeval tmp_time;    /* used for time conversion */
1038
1039     /* get the time from the reading ... handle N */
1040     if (timesyntax == '@') {    /* at-style */
1041         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
1042             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
1043             return -1;
1044         }
1045         if (ds_tv.type == RELATIVE_TO_END_TIME ||
1046             ds_tv.type == RELATIVE_TO_START_TIME) {
1047             rrd_set_error("specifying time relative to the 'start' "
1048                           "or 'end' makes no sense here: %s", updvals[0]);
1049             return -1;
1050         }
1051         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1052         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1053     } else if (strcmp(updvals[0], "N") == 0) {
1054         gettimeofday(&tmp_time, 0);
1055         normalize_time(&tmp_time);
1056         *current_time = tmp_time.tv_sec;
1057         *current_time_usec = tmp_time.tv_usec;
1058     } else {
1059         old_locale = setlocale(LC_NUMERIC, "C");
1060         tmp = strtod(updvals[0], 0);
1061         setlocale(LC_NUMERIC, old_locale);
1062         *current_time = floor(tmp);
1063         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1064     }
1065     /* dont do any correction for old version RRDs */
1066     if (version < 3)
1067         *current_time_usec = 0;
1068
1069     if (*current_time < rrd->live_head->last_up ||
1070         (*current_time == rrd->live_head->last_up &&
1071          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1072         rrd_set_error("illegal attempt to update using time %ld when "
1073                       "last update time is %ld (minimum one second step)",
1074                       *current_time, rrd->live_head->last_up);
1075         return -1;
1076     }
1077     return 0;
1078 }
1079
1080 /*
1081  * Update pdp_new by interpreting the updvals according to the DS type
1082  * (COUNTER, GAUGE, etc.).
1083  *
1084  * Returns 0 on success, -1 on error.
1085  */
1086 static int update_pdp_prep(
1087     rrd_t *rrd,
1088     char **updvals,
1089     rrd_value_t *pdp_new,
1090     double interval)
1091 {
1092     unsigned long ds_idx;
1093     int       ii;
1094     char     *endptr;   /* used in the conversion */
1095     double    rate;
1096     char     *old_locale;
1097     enum dst_en dst_idx;
1098
1099     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1100         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1101
1102         /* make sure we do not build diffs with old last_ds values */
1103         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1104             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1105             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1106         }
1107
1108         /* NOTE: DST_CDEF should never enter this if block, because
1109          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1110          * accidently specified a value for the DST_CDEF. To handle this case,
1111          * an extra check is required. */
1112
1113         if ((updvals[ds_idx + 1][0] != 'U') &&
1114             (dst_idx != DST_CDEF) &&
1115             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1116             rate = DNAN;
1117
1118             /* pdp_new contains rate * time ... eg the bytes transferred during
1119              * the interval. Doing it this way saves a lot of math operations
1120              */
1121             switch (dst_idx) {
1122             case DST_COUNTER:
1123             case DST_DERIVE:
1124                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1125                     if ((updvals[ds_idx + 1][ii] < '0'
1126                          || updvals[ds_idx + 1][ii] > '9')
1127                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1128                         rrd_set_error("not a simple integer: '%s'",
1129                                       updvals[ds_idx + 1]);
1130                         return -1;
1131                     }
1132                 }
1133                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1134                     pdp_new[ds_idx] =
1135                         rrd_diff(updvals[ds_idx + 1],
1136                                  rrd->pdp_prep[ds_idx].last_ds);
1137                     if (dst_idx == DST_COUNTER) {
1138                         /* simple overflow catcher. This will fail
1139                          * terribly for non 32 or 64 bit counters
1140                          * ... are there any others in SNMP land?
1141                          */
1142                         if (pdp_new[ds_idx] < (double) 0.0)
1143                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1144                         if (pdp_new[ds_idx] < (double) 0.0)
1145                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1146                     }
1147                     rate = pdp_new[ds_idx] / interval;
1148                 } else {
1149                     pdp_new[ds_idx] = DNAN;
1150                 }
1151                 break;
1152             case DST_ABSOLUTE:
1153                 old_locale = setlocale(LC_NUMERIC, "C");
1154                 errno = 0;
1155                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1156                 setlocale(LC_NUMERIC, old_locale);
1157                 if (errno > 0) {
1158                     rrd_set_error("converting '%s' to float: %s",
1159                                   updvals[ds_idx + 1], rrd_strerror(errno));
1160                     return -1;
1161                 };
1162                 if (endptr[0] != '\0') {
1163                     rrd_set_error
1164                         ("conversion of '%s' to float not complete: tail '%s'",
1165                          updvals[ds_idx + 1], endptr);
1166                     return -1;
1167                 }
1168                 rate = pdp_new[ds_idx] / interval;
1169                 break;
1170             case DST_GAUGE:
1171                 errno = 0;
1172                 old_locale = setlocale(LC_NUMERIC, "C");
1173                 pdp_new[ds_idx] =
1174                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1175                 setlocale(LC_NUMERIC, old_locale);
1176                 if (errno) {
1177                     rrd_set_error("converting '%s' to float: %s",
1178                                   updvals[ds_idx + 1], rrd_strerror(errno));
1179                     return -1;
1180                 };
1181                 if (endptr[0] != '\0') {
1182                     rrd_set_error
1183                         ("conversion of '%s' to float not complete: tail '%s'",
1184                          updvals[ds_idx + 1], endptr);
1185                     return -1;
1186                 }
1187                 rate = pdp_new[ds_idx] / interval;
1188                 break;
1189             default:
1190                 rrd_set_error("rrd contains unknown DS type : '%s'",
1191                               rrd->ds_def[ds_idx].dst);
1192                 return -1;
1193             }
1194             /* break out of this for loop if the error string is set */
1195             if (rrd_test_error()) {
1196                 return -1;
1197             }
1198             /* make sure pdp_temp is neither too large or too small
1199              * if any of these occur it becomes unknown ...
1200              * sorry folks ... */
1201             if (!isnan(rate) &&
1202                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1203                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1204                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1205                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1206                 pdp_new[ds_idx] = DNAN;
1207             }
1208         } else {
1209             /* no news is news all the same */
1210             pdp_new[ds_idx] = DNAN;
1211         }
1212
1213
1214         /* make a copy of the command line argument for the next run */
1215 #ifdef DEBUG
1216         fprintf(stderr, "prep ds[%lu]\t"
1217                 "last_arg '%s'\t"
1218                 "this_arg '%s'\t"
1219                 "pdp_new %10.2f\n",
1220                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1221                 pdp_new[ds_idx]);
1222 #endif
1223         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1224                 LAST_DS_LEN - 1);
1225         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1226     }
1227     return 0;
1228 }
1229
1230 /*
1231  * How many PDP steps have elapsed since the last update? Returns the answer,
1232  * and stores the time between the last update and the last PDP in pre_time,
1233  * and the time between the last PDP and the current time in post_int.
1234  */
1235 static int calculate_elapsed_steps(
1236     rrd_t *rrd,
1237     unsigned long current_time,
1238     unsigned long current_time_usec,
1239     double interval,
1240     double *pre_int,
1241     double *post_int,
1242     unsigned long *proc_pdp_cnt)
1243 {
1244     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1245     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1246                                  * time */
1247     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1248                                  * when it was last updated */
1249     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1250
1251     /* when was the current pdp started */
1252     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1253     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1254
1255     /* when did the last pdp_st occur */
1256     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1257     occu_pdp_st = current_time - occu_pdp_age;
1258
1259     if (occu_pdp_st > proc_pdp_st) {
1260         /* OK we passed the pdp_st moment */
1261         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1262                                                                      * occurred before the latest
1263                                                                      * pdp_st moment*/
1264         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1265         *post_int = occu_pdp_age;   /* how much after it */
1266         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1267     } else {
1268         *pre_int = interval;
1269         *post_int = 0;
1270     }
1271
1272     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1273
1274 #ifdef DEBUG
1275     printf("proc_pdp_age %lu\t"
1276            "proc_pdp_st %lu\t"
1277            "occu_pfp_age %lu\t"
1278            "occu_pdp_st %lu\t"
1279            "int %lf\t"
1280            "pre_int %lf\t"
1281            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1282            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1283 #endif
1284
1285     /* compute the number of elapsed pdp_st moments */
1286     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1287 }
1288
1289 /*
1290  * Increment the PDP values by the values in pdp_new, or else initialize them.
1291  */
1292 static void simple_update(
1293     rrd_t *rrd,
1294     double interval,
1295     rrd_value_t *pdp_new)
1296 {
1297     int       i;
1298
1299     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1300         if (isnan(pdp_new[i])) {
1301             /* this is not really accurate if we use subsecond data arrival time
1302                should have thought of it when going subsecond resolution ...
1303                sorry next format change we will have it! */
1304             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1305                 floor(interval);
1306         } else {
1307             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1308                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1309             } else {
1310                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1311             }
1312         }
1313 #ifdef DEBUG
1314         fprintf(stderr,
1315                 "NO PDP  ds[%i]\t"
1316                 "value %10.2f\t"
1317                 "unkn_sec %5lu\n",
1318                 i,
1319                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1320                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1321 #endif
1322     }
1323 }
1324
1325 /*
1326  * Call process_pdp_st for each DS.
1327  *
1328  * Returns 0 on success, -1 on error.
1329  */
1330 static int process_all_pdp_st(
1331     rrd_t *rrd,
1332     double interval,
1333     double pre_int,
1334     double post_int,
1335     unsigned long elapsed_pdp_st,
1336     rrd_value_t *pdp_new,
1337     rrd_value_t *pdp_temp)
1338 {
1339     unsigned long ds_idx;
1340
1341     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1342        rate*seconds which occurred up to the last run.
1343        pdp_new[] contains rate*seconds from the latest run.
1344        pdp_temp[] will contain the rate for cdp */
1345
1346     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1347         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1348                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1349                            pdp_new, pdp_temp) == -1) {
1350             return -1;
1351         }
1352 #ifdef DEBUG
1353         fprintf(stderr, "PDP UPD ds[%lu]\t"
1354                 "elapsed_pdp_st %lu\t"
1355                 "pdp_temp %10.2f\t"
1356                 "new_prep %10.2f\t"
1357                 "new_unkn_sec %5lu\n",
1358                 ds_idx,
1359                 elapsed_pdp_st,
1360                 pdp_temp[ds_idx],
1361                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1362                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1363 #endif
1364     }
1365     return 0;
1366 }
1367
1368 /*
1369  * Process an update that occurs after one of the PDP moments.
1370  * Increments the PDP value, sets NAN if time greater than the
1371  * heartbeats have elapsed, processes CDEFs.
1372  *
1373  * Returns 0 on success, -1 on error.
1374  */
1375 static int process_pdp_st(
1376     rrd_t *rrd,
1377     unsigned long ds_idx,
1378     double interval,
1379     double pre_int,
1380     double post_int,
1381     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1382     rrd_value_t *pdp_new,
1383     rrd_value_t *pdp_temp)
1384 {
1385     int       i;
1386
1387     /* update pdp_prep to the current pdp_st. */
1388     double    pre_unknown = 0.0;
1389     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1390     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1391
1392     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1393
1394     rpnstack_init(&rpnstack);
1395
1396
1397     if (isnan(pdp_new[ds_idx])) {
1398         /* a final bit of unknown to be added before calculation
1399            we use a temporary variable for this so that we
1400            don't have to turn integer lines before using the value */
1401         pre_unknown = pre_int;
1402     } else {
1403         if (isnan(scratch[PDP_val].u_val)) {
1404             scratch[PDP_val].u_val = 0;
1405         }
1406         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1407     }
1408
1409     /* if too much of the pdp_prep is unknown we dump it */
1410     /* if the interval is larger thatn mrhb we get NAN */
1411     if ((interval > mrhb) ||
1412         (rrd->stat_head->pdp_step / 2.0 <
1413          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1414         pdp_temp[ds_idx] = DNAN;
1415     } else {
1416         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1417             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1418              pre_unknown);
1419     }
1420
1421     /* process CDEF data sources; remember each CDEF DS can
1422      * only reference other DS with a lower index number */
1423     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1424         rpnp_t   *rpnp;
1425
1426         rpnp =
1427             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1428         /* substitute data values for OP_VARIABLE nodes */
1429         for (i = 0; rpnp[i].op != OP_END; i++) {
1430             if (rpnp[i].op == OP_VARIABLE) {
1431                 rpnp[i].op = OP_NUMBER;
1432                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1433             }
1434         }
1435         /* run the rpn calculator */
1436         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1437             free(rpnp);
1438             rpnstack_free(&rpnstack);
1439             return -1;
1440         }
1441     }
1442
1443     /* make pdp_prep ready for the next run */
1444     if (isnan(pdp_new[ds_idx])) {
1445         /* this is not realy accurate if we use subsecond data arival time
1446            should have thought of it when going subsecond resolution ...
1447            sorry next format change we will have it! */
1448         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1449         scratch[PDP_val].u_val = DNAN;
1450     } else {
1451         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1452         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1453     }
1454     rpnstack_free(&rpnstack);
1455     return 0;
1456 }
1457
1458 /*
1459  * Iterate over all the RRAs for a given DS and:
1460  * 1. Decide whether to schedule a smooth later
1461  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1462  * 3. Update the CDP
1463  *
1464  * Returns 0 on success, -1 on error
1465  */
1466 static int update_all_cdp_prep(
1467     rrd_t *rrd,
1468     unsigned long *rra_step_cnt,
1469     unsigned long rra_begin,
1470     rrd_file_t *rrd_file,
1471     unsigned long elapsed_pdp_st,
1472     unsigned long proc_pdp_cnt,
1473     rrd_value_t **last_seasonal_coef,
1474     rrd_value_t **seasonal_coef,
1475     rrd_value_t *pdp_temp,
1476     unsigned long *rra_current,
1477     unsigned long *skip_update,
1478     int *schedule_smooth)
1479 {
1480     unsigned long rra_idx;
1481
1482     /* index into the CDP scratch array */
1483     enum cf_en current_cf;
1484     unsigned long rra_start;
1485
1486     /* number of rows to be updated in an RRA for a data value. */
1487     unsigned long start_pdp_offset;
1488
1489     rra_start = rra_begin;
1490     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1491         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1492         start_pdp_offset =
1493             rrd->rra_def[rra_idx].pdp_cnt -
1494             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1495         skip_update[rra_idx] = 0;
1496         if (start_pdp_offset <= elapsed_pdp_st) {
1497             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1498                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1499         } else {
1500             rra_step_cnt[rra_idx] = 0;
1501         }
1502
1503         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1504             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1505              * so that they will be correct for the next observed value; note that for
1506              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1507              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1508             if (rra_step_cnt[rra_idx] > 1) {
1509                 skip_update[rra_idx] = 1;
1510                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1511                                 elapsed_pdp_st, last_seasonal_coef);
1512                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1513                                 elapsed_pdp_st + 1, seasonal_coef);
1514             }
1515             /* periodically run a smoother for seasonal effects */
1516             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1517 #ifdef DEBUG
1518                 fprintf(stderr,
1519                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1520                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1521                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1522                         u_cnt);
1523 #endif
1524                 *schedule_smooth = 1;
1525             }
1526             *rra_current = rrd_tell(rrd_file);
1527         }
1528         if (rrd_test_error())
1529             return -1;
1530
1531         if (update_cdp_prep
1532             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1533              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1534              current_cf) == -1) {
1535             return -1;
1536         }
1537         rra_start +=
1538             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1539             sizeof(rrd_value_t);
1540     }
1541     return 0;
1542 }
1543
1544 /* 
1545  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1546  */
1547 static int do_schedule_smooth(
1548     rrd_t *rrd,
1549     unsigned long rra_idx,
1550     unsigned long elapsed_pdp_st)
1551 {
1552     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1553     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1554     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1555     unsigned long seasonal_smooth_idx =
1556         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1557     unsigned long *init_seasonal =
1558         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1559
1560     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1561      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1562      * really an RRA level, not a data source within RRA level parameter, but
1563      * the rra_def is read only for rrd_update (not flushed to disk). */
1564     if (*init_seasonal > BURNIN_CYCLES) {
1565         /* someone has no doubt invented a trick to deal with this wrap around,
1566          * but at least this code is clear. */
1567         if (seasonal_smooth_idx > cur_row) {
1568             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1569              * between PDP and CDP */
1570             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1571         }
1572         /* can't rely on negative numbers because we are working with
1573          * unsigned values */
1574         return (cur_row + elapsed_pdp_st >= row_cnt
1575                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1576     }
1577     /* mark off one of the burn-in cycles */
1578     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1579 }
1580
1581 /*
1582  * For a given RRA, iterate over the data sources and call the appropriate
1583  * consolidation function.
1584  *
1585  * Returns 0 on success, -1 on error.
1586  */
1587 static int update_cdp_prep(
1588     rrd_t *rrd,
1589     unsigned long elapsed_pdp_st,
1590     unsigned long start_pdp_offset,
1591     unsigned long *rra_step_cnt,
1592     int rra_idx,
1593     rrd_value_t *pdp_temp,
1594     rrd_value_t *last_seasonal_coef,
1595     rrd_value_t *seasonal_coef,
1596     int current_cf)
1597 {
1598     unsigned long ds_idx, cdp_idx;
1599
1600     /* update CDP_PREP areas */
1601     /* loop over data soures within each RRA */
1602     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1603
1604         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1605
1606         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1607             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1608                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1609                        elapsed_pdp_st, start_pdp_offset,
1610                        rrd->rra_def[rra_idx].pdp_cnt,
1611                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1612                        rra_idx, ds_idx);
1613         } else {
1614             /* Nothing to consolidate if there's one PDP per CDP. However, if
1615              * we've missed some PDPs, let's update null counters etc. */
1616             if (elapsed_pdp_st > 2) {
1617                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1618                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1619                           current_cf);
1620             }
1621         }
1622
1623         if (rrd_test_error())
1624             return -1;
1625     }                   /* endif data sources loop */
1626     return 0;
1627 }
1628
1629 /*
1630  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1631  * primary value, secondary value, and # of unknowns.
1632  */
1633 static void update_cdp(
1634     unival *scratch,
1635     int current_cf,
1636     rrd_value_t pdp_temp_val,
1637     unsigned long rra_step_cnt,
1638     unsigned long elapsed_pdp_st,
1639     unsigned long start_pdp_offset,
1640     unsigned long pdp_cnt,
1641     rrd_value_t xff,
1642     int i,
1643     int ii)
1644 {
1645     /* shorthand variables */
1646     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1647     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1648     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1649     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1650
1651     if (rra_step_cnt) {
1652         /* If we are in this block, as least 1 CDP value will be written to
1653          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1654          * to be written, then the "fill in" value is the CDP_secondary_val
1655          * entry. */
1656         if (isnan(pdp_temp_val)) {
1657             *cdp_unkn_pdp_cnt += start_pdp_offset;
1658             *cdp_secondary_val = DNAN;
1659         } else {
1660             /* CDP_secondary value is the RRA "fill in" value for intermediary
1661              * CDP data entries. No matter the CF, the value is the same because
1662              * the average, max, min, and last of a list of identical values is
1663              * the same, namely, the value itself. */
1664             *cdp_secondary_val = pdp_temp_val;
1665         }
1666
1667         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1668             *cdp_primary_val = DNAN;
1669             if (current_cf == CF_AVERAGE) {
1670                 *cdp_val =
1671                     initialize_average_carry_over(pdp_temp_val,
1672                                                   elapsed_pdp_st,
1673                                                   start_pdp_offset, pdp_cnt);
1674             } else {
1675                 *cdp_val = pdp_temp_val;
1676             }
1677         } else {
1678             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1679                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1680         }               /* endif meets xff value requirement for a valid value */
1681         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1682          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1683         if (isnan(pdp_temp_val))
1684             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1685         else
1686             *cdp_unkn_pdp_cnt = 0;
1687     } else {            /* rra_step_cnt[i]  == 0 */
1688
1689 #ifdef DEBUG
1690         if (isnan(*cdp_val)) {
1691             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1692                     i, ii);
1693         } else {
1694             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1695                     i, ii, *cdp_val);
1696         }
1697 #endif
1698         if (isnan(pdp_temp_val)) {
1699             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1700         } else {
1701             *cdp_val =
1702                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1703                                   current_cf, i, ii);
1704         }
1705     }
1706 }
1707
1708 /*
1709  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1710  * on the type of consolidation function.
1711  */
1712 static void initialize_cdp_val(
1713     unival *scratch,
1714     int current_cf,
1715     rrd_value_t pdp_temp_val,
1716     unsigned long elapsed_pdp_st,
1717     unsigned long start_pdp_offset,
1718     unsigned long pdp_cnt)
1719 {
1720     rrd_value_t cum_val, cur_val;
1721
1722     switch (current_cf) {
1723     case CF_AVERAGE:
1724         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1725         cur_val = IFDNAN(pdp_temp_val, 0.0);
1726         scratch[CDP_primary_val].u_val =
1727             (cum_val + cur_val * start_pdp_offset) /
1728             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1729         scratch[CDP_val].u_val =
1730             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1731                                           start_pdp_offset, pdp_cnt);
1732         break;
1733     case CF_MAXIMUM:
1734         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1735         cur_val = IFDNAN(pdp_temp_val, -DINF);
1736 #if 0
1737 #ifdef DEBUG
1738         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1739             fprintf(stderr,
1740                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1741                     i, ii);
1742             exit(-1);
1743         }
1744 #endif
1745 #endif
1746         if (cur_val > cum_val)
1747             scratch[CDP_primary_val].u_val = cur_val;
1748         else
1749             scratch[CDP_primary_val].u_val = cum_val;
1750         /* initialize carry over value */
1751         scratch[CDP_val].u_val = pdp_temp_val;
1752         break;
1753     case CF_MINIMUM:
1754         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1755         cur_val = IFDNAN(pdp_temp_val, DINF);
1756 #if 0
1757 #ifdef DEBUG
1758         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1759             fprintf(stderr,
1760                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1761                     ii);
1762             exit(-1);
1763         }
1764 #endif
1765 #endif
1766         if (cur_val < cum_val)
1767             scratch[CDP_primary_val].u_val = cur_val;
1768         else
1769             scratch[CDP_primary_val].u_val = cum_val;
1770         /* initialize carry over value */
1771         scratch[CDP_val].u_val = pdp_temp_val;
1772         break;
1773     case CF_LAST:
1774     default:
1775         scratch[CDP_primary_val].u_val = pdp_temp_val;
1776         /* initialize carry over value */
1777         scratch[CDP_val].u_val = pdp_temp_val;
1778         break;
1779     }
1780 }
1781
1782 /*
1783  * Update the consolidation function for Holt-Winters functions as
1784  * well as other functions that don't actually consolidate multiple
1785  * PDPs.
1786  */
1787 static void reset_cdp(
1788     rrd_t *rrd,
1789     unsigned long elapsed_pdp_st,
1790     rrd_value_t *pdp_temp,
1791     rrd_value_t *last_seasonal_coef,
1792     rrd_value_t *seasonal_coef,
1793     int rra_idx,
1794     int ds_idx,
1795     int cdp_idx,
1796     enum cf_en current_cf)
1797 {
1798     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1799
1800     switch (current_cf) {
1801     case CF_AVERAGE:
1802     default:
1803         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1804         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1805         break;
1806     case CF_SEASONAL:
1807     case CF_DEVSEASONAL:
1808         /* need to update cached seasonal values, so they are consistent
1809          * with the bulk update */
1810         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1811          * CDP_last_deviation are the same. */
1812         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1813         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1814         break;
1815     case CF_HWPREDICT:
1816     case CF_MHWPREDICT:
1817         /* need to update the null_count and last_null_count.
1818          * even do this for non-DNAN pdp_temp because the
1819          * algorithm is not learning from batch updates. */
1820         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1821         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1822         /* fall through */
1823     case CF_DEVPREDICT:
1824         scratch[CDP_primary_val].u_val = DNAN;
1825         scratch[CDP_secondary_val].u_val = DNAN;
1826         break;
1827     case CF_FAILURES:
1828         /* do not count missed bulk values as failures */
1829         scratch[CDP_primary_val].u_val = 0;
1830         scratch[CDP_secondary_val].u_val = 0;
1831         /* need to reset violations buffer.
1832          * could do this more carefully, but for now, just
1833          * assume a bulk update wipes away all violations. */
1834         erase_violations(rrd, cdp_idx, rra_idx);
1835         break;
1836     }
1837 }
1838
1839 static rrd_value_t initialize_average_carry_over(
1840     rrd_value_t pdp_temp_val,
1841     unsigned long elapsed_pdp_st,
1842     unsigned long start_pdp_offset,
1843     unsigned long pdp_cnt)
1844 {
1845     /* initialize carry over value */
1846     if (isnan(pdp_temp_val)) {
1847         return DNAN;
1848     }
1849     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1850 }
1851
1852 /*
1853  * Update or initialize a CDP value based on the consolidation
1854  * function.
1855  *
1856  * Returns the new value.
1857  */
1858 static rrd_value_t calculate_cdp_val(
1859     rrd_value_t cdp_val,
1860     rrd_value_t pdp_temp_val,
1861     unsigned long elapsed_pdp_st,
1862     int current_cf,
1863 #ifdef DEBUG
1864     int i,
1865     int ii
1866 #else
1867     int UNUSED(i),
1868     int UNUSED(ii)
1869 #endif
1870     )
1871 {
1872     if (isnan(cdp_val)) {
1873         if (current_cf == CF_AVERAGE) {
1874             pdp_temp_val *= elapsed_pdp_st;
1875         }
1876 #ifdef DEBUG
1877         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1878                 i, ii, pdp_temp_val);
1879 #endif
1880         return pdp_temp_val;
1881     }
1882     if (current_cf == CF_AVERAGE)
1883         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1884     if (current_cf == CF_MINIMUM)
1885         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1886     if (current_cf == CF_MAXIMUM)
1887         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1888
1889     return pdp_temp_val;
1890 }
1891
1892 /*
1893  * For each RRA, update the seasonal values and then call update_aberrant_CF
1894  * for each data source.
1895  *
1896  * Return 0 on success, -1 on error.
1897  */
1898 static int update_aberrant_cdps(
1899     rrd_t *rrd,
1900     rrd_file_t *rrd_file,
1901     unsigned long rra_begin,
1902     unsigned long *rra_current,
1903     unsigned long elapsed_pdp_st,
1904     rrd_value_t *pdp_temp,
1905     rrd_value_t **seasonal_coef)
1906 {
1907     unsigned long rra_idx, ds_idx, j;
1908
1909     /* number of PDP steps since the last update that
1910      * are assigned to the first CDP to be generated
1911      * since the last update. */
1912     unsigned short scratch_idx;
1913     unsigned long rra_start;
1914     enum cf_en current_cf;
1915
1916     /* this loop is only entered if elapsed_pdp_st < 3 */
1917     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1918          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1919         rra_start = rra_begin;
1920         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1921             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1922                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1923                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1924                     if (scratch_idx == CDP_primary_val) {
1925                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1926                                         elapsed_pdp_st + 1, seasonal_coef);
1927                     } else {
1928                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1929                                         elapsed_pdp_st + 2, seasonal_coef);
1930                     }
1931                     *rra_current = rrd_tell(rrd_file);
1932                 }
1933                 if (rrd_test_error())
1934                     return -1;
1935                 /* loop over data soures within each RRA */
1936                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1937                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1938                                        rra_idx * (rrd->stat_head->ds_cnt) +
1939                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1940                                        *seasonal_coef);
1941                 }
1942             }
1943             rra_start += rrd->rra_def[rra_idx].row_cnt
1944                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1945         }
1946     }
1947     return 0;
1948 }
1949
1950 /* 
1951  * Move sequentially through the file, writing one RRA at a time.  Note this
1952  * architecture divorces the computation of CDP with flushing updated RRA
1953  * entries to disk.
1954  *
1955  * Return 0 on success, -1 on error.
1956  */
1957 static int write_to_rras(
1958     rrd_t *rrd,
1959     rrd_file_t *rrd_file,
1960     unsigned long *rra_step_cnt,
1961     unsigned long rra_begin,
1962     unsigned long *rra_current,
1963     time_t current_time,
1964     unsigned long *skip_update,
1965     rrd_info_t ** pcdp_summary)
1966 {
1967     unsigned long rra_idx;
1968     unsigned long rra_start;
1969     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1970     time_t    rra_time = 0; /* time of update for a RRA */
1971
1972     /* Ready to write to disk */
1973     rra_start = rra_begin;
1974     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1975         /* skip unless there's something to write */
1976         if (rra_step_cnt[rra_idx]) {
1977             /* write the first row */
1978 #ifdef DEBUG
1979             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1980 #endif
1981             rrd->rra_ptr[rra_idx].cur_row++;
1982             if (rrd->rra_ptr[rra_idx].cur_row >=
1983                 rrd->rra_def[rra_idx].row_cnt)
1984                 rrd->rra_ptr[rra_idx].cur_row = 0;  /* wrap around */
1985             /* position on the first row */
1986             rra_pos_tmp = rra_start +
1987                 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1988                 sizeof(rrd_value_t);
1989             if (rra_pos_tmp != *rra_current) {
1990                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1991                     rrd_set_error("seek error in rrd");
1992                     return -1;
1993                 }
1994                 *rra_current = rra_pos_tmp;
1995             }
1996 #ifdef DEBUG
1997             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1998 #endif
1999             if (!skip_update[rra_idx]) {
2000                 if (*pcdp_summary != NULL) {
2001                     rra_time = (current_time - current_time
2002                                 % (rrd->rra_def[rra_idx].pdp_cnt *
2003                                    rrd->stat_head->pdp_step))
2004                         -
2005                         ((rra_step_cnt[rra_idx] -
2006                           1) * rrd->rra_def[rra_idx].pdp_cnt *
2007                          rrd->stat_head->pdp_step);
2008                 }
2009                 if (write_RRA_row
2010                     (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
2011                      pcdp_summary, rra_time) == -1)
2012                     return -1;
2013             }
2014
2015             /* write other rows of the bulk update, if any */
2016             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
2017                 if (++rrd->rra_ptr[rra_idx].cur_row ==
2018                     rrd->rra_def[rra_idx].row_cnt) {
2019 #ifdef DEBUG
2020                     fprintf(stderr,
2021                             "Wraparound for RRA %s, %lu updates left\n",
2022                             rrd->rra_def[rra_idx].cf_nam,
2023                             rra_step_cnt[rra_idx] - 1);
2024 #endif
2025                     /* wrap */
2026                     rrd->rra_ptr[rra_idx].cur_row = 0;
2027                     /* seek back to beginning of current rra */
2028                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
2029                         rrd_set_error("seek error in rrd");
2030                         return -1;
2031                     }
2032 #ifdef DEBUG
2033                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
2034                             rrd_file->pos);
2035 #endif
2036                     *rra_current = rra_start;
2037                 }
2038                 if (!skip_update[rra_idx]) {
2039                     if (*pcdp_summary != NULL) {
2040                         rra_time = (current_time - current_time
2041                                     % (rrd->rra_def[rra_idx].pdp_cnt *
2042                                        rrd->stat_head->pdp_step))
2043                             -
2044                             ((rra_step_cnt[rra_idx] -
2045                               2) * rrd->rra_def[rra_idx].pdp_cnt *
2046                              rrd->stat_head->pdp_step);
2047                     }
2048                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
2049                                       CDP_secondary_val, pcdp_summary,
2050                                       rra_time) == -1)
2051                         return -1;
2052                 }
2053             }
2054         }
2055         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
2056             sizeof(rrd_value_t);
2057     }                   /* RRA LOOP */
2058
2059     return 0;
2060 }
2061
2062 /*
2063  * Write out one row of values (one value per DS) to the archive.
2064  *
2065  * Returns 0 on success, -1 on error.
2066  */
2067 static int write_RRA_row(
2068     rrd_file_t *rrd_file,
2069     rrd_t *rrd,
2070     unsigned long rra_idx,
2071     unsigned long *rra_current,
2072     unsigned short CDP_scratch_idx,
2073     rrd_info_t ** pcdp_summary,
2074     time_t rra_time)
2075 {
2076     unsigned long ds_idx, cdp_idx;
2077     rrd_infoval_t iv;
2078
2079     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2080         /* compute the cdp index */
2081         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2082 #ifdef DEBUG
2083         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2084                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2085                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2086 #endif
2087         if (*pcdp_summary != NULL) {
2088             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2089             /* append info to the return hash */
2090             *pcdp_summary = rrd_info_push(*pcdp_summary,
2091                                           sprintf_alloc
2092                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2093                                            rrd->rra_def[rra_idx].cf_nam,
2094                                            rrd->rra_def[rra_idx].pdp_cnt,
2095                                            rrd->ds_def[ds_idx].ds_nam),
2096                                           RD_I_VAL, iv);
2097         }
2098         if (rrd_write(rrd_file,
2099                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2100                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2101             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2102             return -1;
2103         }
2104         *rra_current += sizeof(rrd_value_t);
2105     }
2106     return 0;
2107 }
2108
2109 /*
2110  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2111  *
2112  * Returns 0 on success, -1 otherwise
2113  */
2114 static int smooth_all_rras(
2115     rrd_t *rrd,
2116     rrd_file_t *rrd_file,
2117     unsigned long rra_begin)
2118 {
2119     unsigned long rra_start = rra_begin;
2120     unsigned long rra_idx;
2121
2122     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2123         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2124             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2125 #ifdef DEBUG
2126             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2127 #endif
2128             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2129             if (rrd_test_error())
2130                 return -1;
2131         }
2132         rra_start += rrd->rra_def[rra_idx].row_cnt
2133             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2134     }
2135     return 0;
2136 }
2137
2138 #ifndef HAVE_MMAP
2139 /*
2140  * Flush changes to disk (unless we're using mmap)
2141  *
2142  * Returns 0 on success, -1 otherwise
2143  */
2144 static int write_changes_to_disk(
2145     rrd_t *rrd,
2146     rrd_file_t *rrd_file,
2147     int version)
2148 {
2149     /* we just need to write back the live header portion now */
2150     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2151                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2152                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2153                  SEEK_SET) != 0) {
2154         rrd_set_error("seek rrd for live header writeback");
2155         return -1;
2156     }
2157     if (version >= 3) {
2158         if (rrd_write(rrd_file, rrd->live_head,
2159                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2160             rrd_set_error("rrd_write live_head to rrd");
2161             return -1;
2162         }
2163     } else {
2164         if (rrd_write(rrd_file, rrd->legacy_last_up,
2165                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2166             rrd_set_error("rrd_write live_head to rrd");
2167             return -1;
2168         }
2169     }
2170
2171
2172     if (rrd_write(rrd_file, rrd->pdp_prep,
2173                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2174         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2175         rrd_set_error("rrd_write pdp_prep to rrd");
2176         return -1;
2177     }
2178
2179     if (rrd_write(rrd_file, rrd->cdp_prep,
2180                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2181                   rrd->stat_head->ds_cnt)
2182         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2183                       rrd->stat_head->ds_cnt)) {
2184
2185         rrd_set_error("rrd_write cdp_prep to rrd");
2186         return -1;
2187     }
2188
2189     if (rrd_write(rrd_file, rrd->rra_ptr,
2190                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2191         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2192         rrd_set_error("rrd_write rra_ptr to rrd");
2193         return -1;
2194     }
2195     return 0;
2196 }
2197 #endif