src/rrd_daemon.c: Use static structs for the signal handler configuration.
[rrdtool.git] / src / rrd_update.c
1
2 /*****************************************************************************
3  * RRDtool 1.3.0  Copyright by Tobi Oetiker, 1997-2008
4  *                Copyright by Florian Forster, 2008
5  *****************************************************************************
6  * rrd_update.c  RRD Update Function
7  *****************************************************************************
8  * $Id$
9  *****************************************************************************/
10
11 #include "rrd_tool.h"
12
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
18
19 #include <locale.h>
20
21 #include "rrd_hw.h"
22 #include "rrd_rpncalc.h"
23
24 #include "rrd_is_thread_safe.h"
25 #include "unused.h"
26
27 #include "rrd_client.h"
28
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 /*
31  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
32  * replacement.
33  */
34 #include <sys/timeb.h>
35
36 #ifndef __MINGW32__
37 struct timeval {
38     time_t    tv_sec;   /* seconds */
39     long      tv_usec;  /* microseconds */
40 };
41 #endif
42
43 struct __timezone {
44     int       tz_minuteswest;   /* minutes W of Greenwich */
45     int       tz_dsttime;   /* type of dst correction */
46 };
47
48 static int gettimeofday(
49     struct timeval *t,
50     struct __timezone *tz)
51 {
52
53     struct _timeb current_time;
54
55     _ftime(&current_time);
56
57     t->tv_sec = current_time.time;
58     t->tv_usec = current_time.millitm * 1000;
59
60     return 0;
61 }
62
63 #endif
64
65 /* FUNCTION PROTOTYPES */
66
67 int       rrd_update_r(
68     const char *filename,
69     const char *tmplt,
70     int argc,
71     const char **argv);
72 int       _rrd_update(
73     const char *filename,
74     const char *tmplt,
75     int argc,
76     const char **argv,
77     rrd_info_t *);
78
79 static int allocate_data_structures(
80     rrd_t *rrd,
81     char ***updvals,
82     rrd_value_t **pdp_temp,
83     const char *tmplt,
84     long **tmpl_idx,
85     unsigned long *tmpl_cnt,
86     unsigned long **rra_step_cnt,
87     unsigned long **skip_update,
88     rrd_value_t **pdp_new);
89
90 static int parse_template(
91     rrd_t *rrd,
92     const char *tmplt,
93     unsigned long *tmpl_cnt,
94     long *tmpl_idx);
95
96 static int process_arg(
97     char *step_start,
98     rrd_t *rrd,
99     rrd_file_t *rrd_file,
100     unsigned long rra_begin,
101     unsigned long *rra_current,
102     time_t *current_time,
103     unsigned long *current_time_usec,
104     rrd_value_t *pdp_temp,
105     rrd_value_t *pdp_new,
106     unsigned long *rra_step_cnt,
107     char **updvals,
108     long *tmpl_idx,
109     unsigned long tmpl_cnt,
110     rrd_info_t ** pcdp_summary,
111     int version,
112     unsigned long *skip_update,
113     int *schedule_smooth);
114
115 static int parse_ds(
116     rrd_t *rrd,
117     char **updvals,
118     long *tmpl_idx,
119     char *input,
120     unsigned long tmpl_cnt,
121     time_t *current_time,
122     unsigned long *current_time_usec,
123     int version);
124
125 static int get_time_from_reading(
126     rrd_t *rrd,
127     char timesyntax,
128     char **updvals,
129     time_t *current_time,
130     unsigned long *current_time_usec,
131     int version);
132
133 static int update_pdp_prep(
134     rrd_t *rrd,
135     char **updvals,
136     rrd_value_t *pdp_new,
137     double interval);
138
139 static int calculate_elapsed_steps(
140     rrd_t *rrd,
141     unsigned long current_time,
142     unsigned long current_time_usec,
143     double interval,
144     double *pre_int,
145     double *post_int,
146     unsigned long *proc_pdp_cnt);
147
148 static void simple_update(
149     rrd_t *rrd,
150     double interval,
151     rrd_value_t *pdp_new);
152
153 static int process_all_pdp_st(
154     rrd_t *rrd,
155     double interval,
156     double pre_int,
157     double post_int,
158     unsigned long elapsed_pdp_st,
159     rrd_value_t *pdp_new,
160     rrd_value_t *pdp_temp);
161
162 static int process_pdp_st(
163     rrd_t *rrd,
164     unsigned long ds_idx,
165     double interval,
166     double pre_int,
167     double post_int,
168     long diff_pdp_st,
169     rrd_value_t *pdp_new,
170     rrd_value_t *pdp_temp);
171
172 static int update_all_cdp_prep(
173     rrd_t *rrd,
174     unsigned long *rra_step_cnt,
175     unsigned long rra_begin,
176     rrd_file_t *rrd_file,
177     unsigned long elapsed_pdp_st,
178     unsigned long proc_pdp_cnt,
179     rrd_value_t **last_seasonal_coef,
180     rrd_value_t **seasonal_coef,
181     rrd_value_t *pdp_temp,
182     unsigned long *rra_current,
183     unsigned long *skip_update,
184     int *schedule_smooth);
185
186 static int do_schedule_smooth(
187     rrd_t *rrd,
188     unsigned long rra_idx,
189     unsigned long elapsed_pdp_st);
190
191 static int update_cdp_prep(
192     rrd_t *rrd,
193     unsigned long elapsed_pdp_st,
194     unsigned long start_pdp_offset,
195     unsigned long *rra_step_cnt,
196     int rra_idx,
197     rrd_value_t *pdp_temp,
198     rrd_value_t *last_seasonal_coef,
199     rrd_value_t *seasonal_coef,
200     int current_cf);
201
202 static void update_cdp(
203     unival *scratch,
204     int current_cf,
205     rrd_value_t pdp_temp_val,
206     unsigned long rra_step_cnt,
207     unsigned long elapsed_pdp_st,
208     unsigned long start_pdp_offset,
209     unsigned long pdp_cnt,
210     rrd_value_t xff,
211     int i,
212     int ii);
213
214 static void initialize_cdp_val(
215     unival *scratch,
216     int current_cf,
217     rrd_value_t pdp_temp_val,
218     unsigned long elapsed_pdp_st,
219     unsigned long start_pdp_offset,
220     unsigned long pdp_cnt);
221
222 static void reset_cdp(
223     rrd_t *rrd,
224     unsigned long elapsed_pdp_st,
225     rrd_value_t *pdp_temp,
226     rrd_value_t *last_seasonal_coef,
227     rrd_value_t *seasonal_coef,
228     int rra_idx,
229     int ds_idx,
230     int cdp_idx,
231     enum cf_en current_cf);
232
233 static rrd_value_t initialize_average_carry_over(
234     rrd_value_t pdp_temp_val,
235     unsigned long elapsed_pdp_st,
236     unsigned long start_pdp_offset,
237     unsigned long pdp_cnt);
238
239 static rrd_value_t calculate_cdp_val(
240     rrd_value_t cdp_val,
241     rrd_value_t pdp_temp_val,
242     unsigned long elapsed_pdp_st,
243     int current_cf,
244     int i,
245     int ii);
246
247 static int update_aberrant_cdps(
248     rrd_t *rrd,
249     rrd_file_t *rrd_file,
250     unsigned long rra_begin,
251     unsigned long *rra_current,
252     unsigned long elapsed_pdp_st,
253     rrd_value_t *pdp_temp,
254     rrd_value_t **seasonal_coef);
255
256 static int write_to_rras(
257     rrd_t *rrd,
258     rrd_file_t *rrd_file,
259     unsigned long *rra_step_cnt,
260     unsigned long rra_begin,
261     unsigned long *rra_current,
262     time_t current_time,
263     unsigned long *skip_update,
264     rrd_info_t ** pcdp_summary);
265
266 static int write_RRA_row(
267     rrd_file_t *rrd_file,
268     rrd_t *rrd,
269     unsigned long rra_idx,
270     unsigned long *rra_current,
271     unsigned short CDP_scratch_idx,
272     rrd_info_t ** pcdp_summary,
273     time_t rra_time);
274
275 static int smooth_all_rras(
276     rrd_t *rrd,
277     rrd_file_t *rrd_file,
278     unsigned long rra_begin);
279
280 #ifndef HAVE_MMAP
281 static int write_changes_to_disk(
282     rrd_t *rrd,
283     rrd_file_t *rrd_file,
284     int version);
285 #endif
286
287 /*
288  * normalize time as returned by gettimeofday. usec part must
289  * be always >= 0
290  */
291 static inline void normalize_time(
292     struct timeval *t)
293 {
294     if (t->tv_usec < 0) {
295         t->tv_sec--;
296         t->tv_usec += 1e6L;
297     }
298 }
299
300 /*
301  * Sets current_time and current_time_usec based on the current time.
302  * current_time_usec is set to 0 if the version number is 1 or 2.
303  */
304 static inline void initialize_time(
305     time_t *current_time,
306     unsigned long *current_time_usec,
307     int version)
308 {
309     struct timeval tmp_time;    /* used for time conversion */
310
311     gettimeofday(&tmp_time, 0);
312     normalize_time(&tmp_time);
313     *current_time = tmp_time.tv_sec;
314     if (version >= 3) {
315         *current_time_usec = tmp_time.tv_usec;
316     } else {
317         *current_time_usec = 0;
318     }
319 }
320
321 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
322
323 rrd_info_t *rrd_update_v(
324     int argc,
325     char **argv)
326 {
327     char     *tmplt = NULL;
328     rrd_info_t *result = NULL;
329     rrd_infoval_t rc;
330     struct option long_options[] = {
331         {"template", required_argument, 0, 't'},
332         {0, 0, 0, 0}
333     };
334
335     rc.u_int = -1;
336     optind = 0;
337     opterr = 0;         /* initialize getopt */
338
339     while (1) {
340         int       option_index = 0;
341         int       opt;
342
343         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
344
345         if (opt == EOF)
346             break;
347
348         switch (opt) {
349         case 't':
350             tmplt = optarg;
351             break;
352
353         case '?':
354             rrd_set_error("unknown option '%s'", argv[optind - 1]);
355             goto end_tag;
356         }
357     }
358
359     /* need at least 2 arguments: filename, data. */
360     if (argc - optind < 2) {
361         rrd_set_error("Not enough arguments");
362         goto end_tag;
363     }
364     rc.u_int = 0;
365     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
366     rc.u_int = _rrd_update(argv[optind], tmplt,
367                            argc - optind - 1,
368                            (const char **) (argv + optind + 1), result);
369     result->value.u_int = rc.u_int;
370   end_tag:
371     return result;
372 }
373
374 int rrd_update(
375     int argc,
376     char **argv)
377 {
378     struct option long_options[] = {
379         {"template", required_argument, 0, 't'},
380         {"daemon",   required_argument, 0, 'd'},
381         {0, 0, 0, 0}
382     };
383     int       option_index = 0;
384     int       opt;
385     char     *tmplt = NULL;
386     int       rc = -1;
387     char     *daemon = NULL;
388
389     optind = 0;
390     opterr = 0;         /* initialize getopt */
391
392     while (1) {
393         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
394
395         if (opt == EOF)
396             break;
397
398         switch (opt) {
399         case 't':
400             tmplt = strdup(optarg);
401             break;
402
403         case 'd':
404             if (daemon != NULL)
405                 free (daemon);
406             daemon = strdup (optarg);
407             if (daemon == NULL)
408             {
409                 rrd_set_error("strdup failed.");
410                 goto out;
411             }
412             break;
413
414         case '?':
415             rrd_set_error("unknown option '%s'", argv[optind - 1]);
416             goto out;
417         }
418     }
419
420     /* need at least 2 arguments: filename, data. */
421     if (argc - optind < 2) {
422         rrd_set_error("Not enough arguments");
423         goto out;
424     }
425
426     if ((tmplt != NULL) && (daemon != NULL))
427     {
428         rrd_set_error("The caching daemon cannot be used together with "
429                 "templates yet.");
430         goto out;
431     }
432
433     if ((tmplt == NULL) && (daemon == NULL))
434     {
435         char *temp;
436
437         temp = getenv (ENV_RRDCACHED_ADDRESS);
438         if (temp != NULL)
439         {
440             daemon = strdup (temp);
441             if (daemon == NULL)
442             {
443                 rrd_set_error("strdup failed.");
444                 goto out;
445             }
446         }
447     }
448
449     if (daemon != NULL)
450     {
451         int status;
452
453         status = rrdc_connect (daemon);
454         if (status != 0)
455         {
456             rrd_set_error("Unable to connect to daemon: %s",
457                     (status < 0)
458                     ? "Internal error"
459                     : rrd_strerror (status));
460             goto out;
461         }
462
463         status = rrdc_update (/* file = */ argv[optind],
464                 /* values_num = */ argc - optind - 1,
465                 /* values = */ (void *) (argv + optind + 1));
466         if (status != 0)
467         {
468             rrd_set_error("Failed sending the values to the daemon: %s",
469                     (status < 0)
470                     ? "Internal error"
471                     : rrd_strerror (status));
472         }
473
474         rrdc_disconnect ();
475         goto out;
476     } /* if (daemon != NULL) */
477
478     rc = rrd_update_r(argv[optind], tmplt,
479                       argc - optind - 1, (const char **) (argv + optind + 1));
480   out:
481     if (tmplt != NULL)
482     {
483         free(tmplt);
484         tmplt = NULL;
485     }
486     if (daemon != NULL)
487     {
488         free (daemon);
489         daemon = NULL;
490     }
491     return rc;
492 }
493
494 int rrd_update_r(
495     const char *filename,
496     const char *tmplt,
497     int argc,
498     const char **argv)
499 {
500     return _rrd_update(filename, tmplt, argc, argv, NULL);
501 }
502
503 int _rrd_update(
504     const char *filename,
505     const char *tmplt,
506     int argc,
507     const char **argv,
508     rrd_info_t * pcdp_summary)
509 {
510
511     int       arg_i = 2;
512
513     unsigned long rra_begin;    /* byte pointer to the rra
514                                  * area in the rrd file.  this
515                                  * pointer never changes value */
516     unsigned long rra_current;  /* byte pointer to the current write
517                                  * spot in the rrd file. */
518     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
519                              * to the existing entry */
520     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
521                              * to the cdp values */
522
523     long     *tmpl_idx; /* index representing the settings
524                          * transported by the tmplt index */
525     unsigned long tmpl_cnt = 2; /* time and data */
526     rrd_t     rrd;
527     time_t    current_time = 0;
528     unsigned long current_time_usec = 0;    /* microseconds part of current time */
529     char    **updvals;
530     int       schedule_smooth = 0;
531
532     /* number of elapsed PDP steps since last update */
533     unsigned long *rra_step_cnt = NULL;
534
535     int       version;  /* rrd version */
536     rrd_file_t *rrd_file;
537     char     *arg_copy; /* for processing the argv */
538     unsigned long *skip_update; /* RRAs to advance but not write */
539
540     /* need at least 1 arguments: data. */
541     if (argc < 1) {
542         rrd_set_error("Not enough arguments");
543         goto err_out;
544     }
545
546     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
547         goto err_free;
548     }
549     /* We are now at the beginning of the rra's */
550     rra_current = rra_begin = rrd_file->header_len;
551
552     version = atoi(rrd.stat_head->version);
553
554     initialize_time(&current_time, &current_time_usec, version);
555
556     /* get exclusive lock to whole file.
557      * lock gets removed when we close the file.
558      */
559     if (rrd_lock(rrd_file) != 0) {
560         rrd_set_error("could not lock RRD");
561         goto err_close;
562     }
563
564     if (allocate_data_structures(&rrd, &updvals,
565                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
566                                  &rra_step_cnt, &skip_update,
567                                  &pdp_new) == -1) {
568         goto err_close;
569     }
570
571     /* loop through the arguments. */
572     for (arg_i = 0; arg_i < argc; arg_i++) {
573         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
574             rrd_set_error("failed duplication argv entry");
575             break;
576         }
577         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
578                         &current_time, &current_time_usec, pdp_temp, pdp_new,
579                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
580                         &pcdp_summary, version, skip_update,
581                         &schedule_smooth) == -1) {
582             free(arg_copy);
583             break;
584         }
585         free(arg_copy);
586     }
587
588     free(rra_step_cnt);
589
590     /* if we got here and if there is an error and if the file has not been
591      * written to, then close things up and return. */
592     if (rrd_test_error()) {
593         goto err_free_structures;
594     }
595 #ifndef HAVE_MMAP
596     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
597         goto err_free_structures;
598     }
599 #endif
600
601     /* calling the smoothing code here guarantees at most one smoothing
602      * operation per rrd_update call. Unfortunately, it is possible with bulk
603      * updates, or a long-delayed update for smoothing to occur off-schedule.
604      * This really isn't critical except during the burn-in cycles. */
605     if (schedule_smooth) {
606         smooth_all_rras(&rrd, rrd_file, rra_begin);
607     }
608
609 /*    rrd_dontneed(rrd_file,&rrd); */
610     rrd_free(&rrd);
611     rrd_close(rrd_file);
612
613     free(pdp_new);
614     free(tmpl_idx);
615     free(pdp_temp);
616     free(skip_update);
617     free(updvals);
618     return 0;
619
620   err_free_structures:
621     free(pdp_new);
622     free(tmpl_idx);
623     free(pdp_temp);
624     free(skip_update);
625     free(updvals);
626   err_close:
627     rrd_close(rrd_file);
628   err_free:
629     rrd_free(&rrd);
630   err_out:
631     return -1;
632 }
633
634 /*
635  * get exclusive lock to whole file.
636  * lock gets removed when we close the file
637  *
638  * returns 0 on success
639  */
640 int rrd_lock(
641     rrd_file_t *file)
642 {
643     int       rcstat;
644
645     {
646 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
647         struct _stat st;
648
649         if (_fstat(file->fd, &st) == 0) {
650             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
651         } else {
652             rcstat = -1;
653         }
654 #else
655         struct flock lock;
656
657         lock.l_type = F_WRLCK;  /* exclusive write lock */
658         lock.l_len = 0; /* whole file */
659         lock.l_start = 0;   /* start of file */
660         lock.l_whence = SEEK_SET;   /* end of file */
661
662         rcstat = fcntl(file->fd, F_SETLK, &lock);
663 #endif
664     }
665
666     return (rcstat);
667 }
668
669 /*
670  * Allocate some important arrays used, and initialize the template.
671  *
672  * When it returns, either all of the structures are allocated
673  * or none of them are.
674  *
675  * Returns 0 on success, -1 on error.
676  */
677 static int allocate_data_structures(
678     rrd_t *rrd,
679     char ***updvals,
680     rrd_value_t **pdp_temp,
681     const char *tmplt,
682     long **tmpl_idx,
683     unsigned long *tmpl_cnt,
684     unsigned long **rra_step_cnt,
685     unsigned long **skip_update,
686     rrd_value_t **pdp_new)
687 {
688     unsigned  i, ii;
689     if ((*updvals = (char **) malloc(sizeof(char *)
690                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
691         rrd_set_error("allocating updvals pointer array.");
692         return -1;
693     }
694     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
695                                             * rrd->stat_head->ds_cnt)) ==
696         NULL) {
697         rrd_set_error("allocating pdp_temp.");
698         goto err_free_updvals;
699     }
700     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
701                                                  *
702                                                  rrd->stat_head->rra_cnt)) ==
703         NULL) {
704         rrd_set_error("allocating skip_update.");
705         goto err_free_pdp_temp;
706     }
707     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
708                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
709         rrd_set_error("allocating tmpl_idx.");
710         goto err_free_skip_update;
711     }
712     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
713                                                   *
714                                                   (rrd->stat_head->
715                                                    rra_cnt))) == NULL) {
716         rrd_set_error("allocating rra_step_cnt.");
717         goto err_free_tmpl_idx;
718     }
719
720     /* initialize tmplt redirector */
721     /* default config example (assume DS 1 is a CDEF DS)
722        tmpl_idx[0] -> 0; (time)
723        tmpl_idx[1] -> 1; (DS 0)
724        tmpl_idx[2] -> 3; (DS 2)
725        tmpl_idx[3] -> 4; (DS 3) */
726     (*tmpl_idx)[0] = 0; /* time */
727     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
728         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
729             (*tmpl_idx)[ii++] = i;
730     }
731     *tmpl_cnt = ii;
732
733     if (tmplt != NULL) {
734         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
735             goto err_free_rra_step_cnt;
736         }
737     }
738
739     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
740                                            * rrd->stat_head->ds_cnt)) == NULL) {
741         rrd_set_error("allocating pdp_new.");
742         goto err_free_rra_step_cnt;
743     }
744
745     return 0;
746
747   err_free_rra_step_cnt:
748     free(*rra_step_cnt);
749   err_free_tmpl_idx:
750     free(*tmpl_idx);
751   err_free_skip_update:
752     free(*skip_update);
753   err_free_pdp_temp:
754     free(*pdp_temp);
755   err_free_updvals:
756     free(*updvals);
757     return -1;
758 }
759
760 /*
761  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
762  *
763  * Returns 0 on success.
764  */
765 static int parse_template(
766     rrd_t *rrd,
767     const char *tmplt,
768     unsigned long *tmpl_cnt,
769     long *tmpl_idx)
770 {
771     char     *dsname, *tmplt_copy;
772     unsigned int tmpl_len, i;
773     int       ret = 0;
774
775     *tmpl_cnt = 1;      /* the first entry is the time */
776
777     /* we should work on a writeable copy here */
778     if ((tmplt_copy = strdup(tmplt)) == NULL) {
779         rrd_set_error("error copying tmplt '%s'", tmplt);
780         ret = -1;
781         goto out;
782     }
783
784     dsname = tmplt_copy;
785     tmpl_len = strlen(tmplt_copy);
786     for (i = 0; i <= tmpl_len; i++) {
787         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
788             tmplt_copy[i] = '\0';
789             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
790                 rrd_set_error("tmplt contains more DS definitions than RRD");
791                 ret = -1;
792                 goto out_free_tmpl_copy;
793             }
794             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
795                 rrd_set_error("unknown DS name '%s'", dsname);
796                 ret = -1;
797                 goto out_free_tmpl_copy;
798             }
799             /* go to the next entry on the tmplt_copy */
800             if (i < tmpl_len)
801                 dsname = &tmplt_copy[i + 1];
802         }
803     }
804   out_free_tmpl_copy:
805     free(tmplt_copy);
806   out:
807     return ret;
808 }
809
810 /*
811  * Parse an update string, updates the primary data points (PDPs)
812  * and consolidated data points (CDPs), and writes changes to the RRAs.
813  *
814  * Returns 0 on success, -1 on error.
815  */
816 static int process_arg(
817     char *step_start,
818     rrd_t *rrd,
819     rrd_file_t *rrd_file,
820     unsigned long rra_begin,
821     unsigned long *rra_current,
822     time_t *current_time,
823     unsigned long *current_time_usec,
824     rrd_value_t *pdp_temp,
825     rrd_value_t *pdp_new,
826     unsigned long *rra_step_cnt,
827     char **updvals,
828     long *tmpl_idx,
829     unsigned long tmpl_cnt,
830     rrd_info_t ** pcdp_summary,
831     int version,
832     unsigned long *skip_update,
833     int *schedule_smooth)
834 {
835     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
836
837     /* a vector of future Holt-Winters seasonal coefs */
838     unsigned long elapsed_pdp_st;
839
840     double    interval, pre_int, post_int;  /* interval between this and
841                                              * the last run */
842     unsigned long proc_pdp_cnt;
843     unsigned long rra_start;
844
845     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
846                  current_time, current_time_usec, version) == -1) {
847         return -1;
848     }
849     /* seek to the beginning of the rra's */
850     if (*rra_current != rra_begin) {
851 #ifndef HAVE_MMAP
852         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
853             rrd_set_error("seek error in rrd");
854             return -1;
855         }
856 #endif
857         *rra_current = rra_begin;
858     }
859     rra_start = rra_begin;
860
861     interval = (double) (*current_time - rrd->live_head->last_up)
862         + (double) ((long) *current_time_usec -
863                     (long) rrd->live_head->last_up_usec) / 1e6f;
864
865     /* process the data sources and update the pdp_prep 
866      * area accordingly */
867     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
868         return -1;
869     }
870
871     elapsed_pdp_st = calculate_elapsed_steps(rrd,
872                                              *current_time,
873                                              *current_time_usec, interval,
874                                              &pre_int, &post_int,
875                                              &proc_pdp_cnt);
876
877     /* has a pdp_st moment occurred since the last run ? */
878     if (elapsed_pdp_st == 0) {
879         /* no we have not passed a pdp_st moment. therefore update is simple */
880         simple_update(rrd, interval, pdp_new);
881     } else {
882         /* an pdp_st has occurred. */
883         if (process_all_pdp_st(rrd, interval,
884                                pre_int, post_int,
885                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
886             return -1;
887         }
888         if (update_all_cdp_prep(rrd, rra_step_cnt,
889                                 rra_begin, rrd_file,
890                                 elapsed_pdp_st,
891                                 proc_pdp_cnt,
892                                 &last_seasonal_coef,
893                                 &seasonal_coef,
894                                 pdp_temp, rra_current,
895                                 skip_update, schedule_smooth) == -1) {
896             goto err_free_coefficients;
897         }
898         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
899                                  elapsed_pdp_st, pdp_temp,
900                                  &seasonal_coef) == -1) {
901             goto err_free_coefficients;
902         }
903         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
904                           rra_current, *current_time, skip_update,
905                           pcdp_summary) == -1) {
906             goto err_free_coefficients;
907         }
908     }                   /* endif a pdp_st has occurred */
909     rrd->live_head->last_up = *current_time;
910     rrd->live_head->last_up_usec = *current_time_usec;
911
912     if (version < 3) {
913         *rrd->legacy_last_up = rrd->live_head->last_up;
914     }
915     free(seasonal_coef);
916     free(last_seasonal_coef);
917     return 0;
918
919   err_free_coefficients:
920     free(seasonal_coef);
921     free(last_seasonal_coef);
922     return -1;
923 }
924
925 /*
926  * Parse a DS string (time + colon-separated values), storing the
927  * results in current_time, current_time_usec, and updvals.
928  *
929  * Returns 0 on success, -1 on error.
930  */
931 static int parse_ds(
932     rrd_t *rrd,
933     char **updvals,
934     long *tmpl_idx,
935     char *input,
936     unsigned long tmpl_cnt,
937     time_t *current_time,
938     unsigned long *current_time_usec,
939     int version)
940 {
941     char     *p;
942     unsigned long i;
943     char      timesyntax;
944
945     updvals[0] = input;
946     /* initialize all ds input to unknown except the first one
947        which has always got to be set */
948     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
949         updvals[i] = "U";
950
951     /* separate all ds elements; first must be examined separately
952        due to alternate time syntax */
953     if ((p = strchr(input, '@')) != NULL) {
954         timesyntax = '@';
955     } else if ((p = strchr(input, ':')) != NULL) {
956         timesyntax = ':';
957     } else {
958         rrd_set_error("expected timestamp not found in data source from %s",
959                       input);
960         return -1;
961     }
962     *p = '\0';
963     i = 1;
964     updvals[tmpl_idx[i++]] = p + 1;
965     while (*(++p)) {
966         if (*p == ':') {
967             *p = '\0';
968             if (i < tmpl_cnt) {
969                 updvals[tmpl_idx[i++]] = p + 1;
970             }
971         }
972     }
973
974     if (i != tmpl_cnt) {
975         rrd_set_error("expected %lu data source readings (got %lu) from %s",
976                       tmpl_cnt - 1, i, input);
977         return -1;
978     }
979
980     if (get_time_from_reading(rrd, timesyntax, updvals,
981                               current_time, current_time_usec,
982                               version) == -1) {
983         return -1;
984     }
985     return 0;
986 }
987
988 /*
989  * Parse the time in a DS string, store it in current_time and 
990  * current_time_usec and verify that it's later than the last
991  * update for this DS.
992  *
993  * Returns 0 on success, -1 on error.
994  */
995 static int get_time_from_reading(
996     rrd_t *rrd,
997     char timesyntax,
998     char **updvals,
999     time_t *current_time,
1000     unsigned long *current_time_usec,
1001     int version)
1002 {
1003     double    tmp;
1004     char     *parsetime_error = NULL;
1005     char     *old_locale;
1006     rrd_time_value_t ds_tv;
1007     struct timeval tmp_time;    /* used for time conversion */
1008
1009     /* get the time from the reading ... handle N */
1010     if (timesyntax == '@') {    /* at-style */
1011         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
1012             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
1013             return -1;
1014         }
1015         if (ds_tv.type == RELATIVE_TO_END_TIME ||
1016             ds_tv.type == RELATIVE_TO_START_TIME) {
1017             rrd_set_error("specifying time relative to the 'start' "
1018                           "or 'end' makes no sense here: %s", updvals[0]);
1019             return -1;
1020         }
1021         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1022         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1023     } else if (strcmp(updvals[0], "N") == 0) {
1024         gettimeofday(&tmp_time, 0);
1025         normalize_time(&tmp_time);
1026         *current_time = tmp_time.tv_sec;
1027         *current_time_usec = tmp_time.tv_usec;
1028     } else {
1029         old_locale = setlocale(LC_NUMERIC, "C");
1030         tmp = strtod(updvals[0], 0);
1031         setlocale(LC_NUMERIC, old_locale);
1032         *current_time = floor(tmp);
1033         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1034     }
1035     /* dont do any correction for old version RRDs */
1036     if (version < 3)
1037         *current_time_usec = 0;
1038
1039     if (*current_time < rrd->live_head->last_up ||
1040         (*current_time == rrd->live_head->last_up &&
1041          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1042         rrd_set_error("illegal attempt to update using time %ld when "
1043                       "last update time is %ld (minimum one second step)",
1044                       *current_time, rrd->live_head->last_up);
1045         return -1;
1046     }
1047     return 0;
1048 }
1049
1050 /*
1051  * Update pdp_new by interpreting the updvals according to the DS type
1052  * (COUNTER, GAUGE, etc.).
1053  *
1054  * Returns 0 on success, -1 on error.
1055  */
1056 static int update_pdp_prep(
1057     rrd_t *rrd,
1058     char **updvals,
1059     rrd_value_t *pdp_new,
1060     double interval)
1061 {
1062     unsigned long ds_idx;
1063     int       ii;
1064     char     *endptr;   /* used in the conversion */
1065     double    rate;
1066     char     *old_locale;
1067     enum dst_en dst_idx;
1068
1069     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1070         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1071
1072         /* make sure we do not build diffs with old last_ds values */
1073         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1074             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1075             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1076         }
1077
1078         /* NOTE: DST_CDEF should never enter this if block, because
1079          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1080          * accidently specified a value for the DST_CDEF. To handle this case,
1081          * an extra check is required. */
1082
1083         if ((updvals[ds_idx + 1][0] != 'U') &&
1084             (dst_idx != DST_CDEF) &&
1085             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1086             rate = DNAN;
1087
1088             /* pdp_new contains rate * time ... eg the bytes transferred during
1089              * the interval. Doing it this way saves a lot of math operations
1090              */
1091             switch (dst_idx) {
1092             case DST_COUNTER:
1093             case DST_DERIVE:
1094                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1095                     if ((updvals[ds_idx + 1][ii] < '0'
1096                          || updvals[ds_idx + 1][ii] > '9')
1097                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1098                         rrd_set_error("not a simple integer: '%s'",
1099                                       updvals[ds_idx + 1]);
1100                         return -1;
1101                     }
1102                 }
1103                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1104                     pdp_new[ds_idx] =
1105                         rrd_diff(updvals[ds_idx + 1],
1106                                  rrd->pdp_prep[ds_idx].last_ds);
1107                     if (dst_idx == DST_COUNTER) {
1108                         /* simple overflow catcher. This will fail
1109                          * terribly for non 32 or 64 bit counters
1110                          * ... are there any others in SNMP land?
1111                          */
1112                         if (pdp_new[ds_idx] < (double) 0.0)
1113                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1114                         if (pdp_new[ds_idx] < (double) 0.0)
1115                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1116                     }
1117                     rate = pdp_new[ds_idx] / interval;
1118                 } else {
1119                     pdp_new[ds_idx] = DNAN;
1120                 }
1121                 break;
1122             case DST_ABSOLUTE:
1123                 old_locale = setlocale(LC_NUMERIC, "C");
1124                 errno = 0;
1125                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1126                 setlocale(LC_NUMERIC, old_locale);
1127                 if (errno > 0) {
1128                     rrd_set_error("converting '%s' to float: %s",
1129                                   updvals[ds_idx + 1], rrd_strerror(errno));
1130                     return -1;
1131                 };
1132                 if (endptr[0] != '\0') {
1133                     rrd_set_error
1134                         ("conversion of '%s' to float not complete: tail '%s'",
1135                          updvals[ds_idx + 1], endptr);
1136                     return -1;
1137                 }
1138                 rate = pdp_new[ds_idx] / interval;
1139                 break;
1140             case DST_GAUGE:
1141                 errno = 0;
1142                 old_locale = setlocale(LC_NUMERIC, "C");
1143                 pdp_new[ds_idx] =
1144                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1145                 setlocale(LC_NUMERIC, old_locale);
1146                 if (errno) {
1147                     rrd_set_error("converting '%s' to float: %s",
1148                                   updvals[ds_idx + 1], rrd_strerror(errno));
1149                     return -1;
1150                 };
1151                 if (endptr[0] != '\0') {
1152                     rrd_set_error
1153                         ("conversion of '%s' to float not complete: tail '%s'",
1154                          updvals[ds_idx + 1], endptr);
1155                     return -1;
1156                 }
1157                 rate = pdp_new[ds_idx] / interval;
1158                 break;
1159             default:
1160                 rrd_set_error("rrd contains unknown DS type : '%s'",
1161                               rrd->ds_def[ds_idx].dst);
1162                 return -1;
1163             }
1164             /* break out of this for loop if the error string is set */
1165             if (rrd_test_error()) {
1166                 return -1;
1167             }
1168             /* make sure pdp_temp is neither too large or too small
1169              * if any of these occur it becomes unknown ...
1170              * sorry folks ... */
1171             if (!isnan(rate) &&
1172                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1173                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1174                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1175                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1176                 pdp_new[ds_idx] = DNAN;
1177             }
1178         } else {
1179             /* no news is news all the same */
1180             pdp_new[ds_idx] = DNAN;
1181         }
1182
1183
1184         /* make a copy of the command line argument for the next run */
1185 #ifdef DEBUG
1186         fprintf(stderr, "prep ds[%lu]\t"
1187                 "last_arg '%s'\t"
1188                 "this_arg '%s'\t"
1189                 "pdp_new %10.2f\n",
1190                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1191                 pdp_new[ds_idx]);
1192 #endif
1193         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1194                 LAST_DS_LEN - 1);
1195         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1196     }
1197     return 0;
1198 }
1199
1200 /*
1201  * How many PDP steps have elapsed since the last update? Returns the answer,
1202  * and stores the time between the last update and the last PDP in pre_time,
1203  * and the time between the last PDP and the current time in post_int.
1204  */
1205 static int calculate_elapsed_steps(
1206     rrd_t *rrd,
1207     unsigned long current_time,
1208     unsigned long current_time_usec,
1209     double interval,
1210     double *pre_int,
1211     double *post_int,
1212     unsigned long *proc_pdp_cnt)
1213 {
1214     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1215     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1216                                  * time */
1217     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1218                                  * when it was last updated */
1219     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1220
1221     /* when was the current pdp started */
1222     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1223     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1224
1225     /* when did the last pdp_st occur */
1226     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1227     occu_pdp_st = current_time - occu_pdp_age;
1228
1229     if (occu_pdp_st > proc_pdp_st) {
1230         /* OK we passed the pdp_st moment */
1231         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1232                                                                      * occurred before the latest
1233                                                                      * pdp_st moment*/
1234         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1235         *post_int = occu_pdp_age;   /* how much after it */
1236         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1237     } else {
1238         *pre_int = interval;
1239         *post_int = 0;
1240     }
1241
1242     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1243
1244 #ifdef DEBUG
1245     printf("proc_pdp_age %lu\t"
1246            "proc_pdp_st %lu\t"
1247            "occu_pfp_age %lu\t"
1248            "occu_pdp_st %lu\t"
1249            "int %lf\t"
1250            "pre_int %lf\t"
1251            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1252            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1253 #endif
1254
1255     /* compute the number of elapsed pdp_st moments */
1256     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1257 }
1258
1259 /*
1260  * Increment the PDP values by the values in pdp_new, or else initialize them.
1261  */
1262 static void simple_update(
1263     rrd_t *rrd,
1264     double interval,
1265     rrd_value_t *pdp_new)
1266 {
1267     int       i;
1268
1269     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1270         if (isnan(pdp_new[i])) {
1271             /* this is not really accurate if we use subsecond data arrival time
1272                should have thought of it when going subsecond resolution ...
1273                sorry next format change we will have it! */
1274             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1275                 floor(interval);
1276         } else {
1277             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1278                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1279             } else {
1280                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1281             }
1282         }
1283 #ifdef DEBUG
1284         fprintf(stderr,
1285                 "NO PDP  ds[%i]\t"
1286                 "value %10.2f\t"
1287                 "unkn_sec %5lu\n",
1288                 i,
1289                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1290                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1291 #endif
1292     }
1293 }
1294
1295 /*
1296  * Call process_pdp_st for each DS.
1297  *
1298  * Returns 0 on success, -1 on error.
1299  */
1300 static int process_all_pdp_st(
1301     rrd_t *rrd,
1302     double interval,
1303     double pre_int,
1304     double post_int,
1305     unsigned long elapsed_pdp_st,
1306     rrd_value_t *pdp_new,
1307     rrd_value_t *pdp_temp)
1308 {
1309     unsigned long ds_idx;
1310
1311     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1312        rate*seconds which occurred up to the last run.
1313        pdp_new[] contains rate*seconds from the latest run.
1314        pdp_temp[] will contain the rate for cdp */
1315
1316     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1317         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1318                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1319                            pdp_new, pdp_temp) == -1) {
1320             return -1;
1321         }
1322 #ifdef DEBUG
1323         fprintf(stderr, "PDP UPD ds[%lu]\t"
1324                 "elapsed_pdp_st %lu\t"
1325                 "pdp_temp %10.2f\t"
1326                 "new_prep %10.2f\t"
1327                 "new_unkn_sec %5lu\n",
1328                 ds_idx,
1329                 elapsed_pdp_st,
1330                 pdp_temp[ds_idx],
1331                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1332                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1333 #endif
1334     }
1335     return 0;
1336 }
1337
1338 /*
1339  * Process an update that occurs after one of the PDP moments.
1340  * Increments the PDP value, sets NAN if time greater than the
1341  * heartbeats have elapsed, processes CDEFs.
1342  *
1343  * Returns 0 on success, -1 on error.
1344  */
1345 static int process_pdp_st(
1346     rrd_t *rrd,
1347     unsigned long ds_idx,
1348     double interval,
1349     double pre_int,
1350     double post_int,
1351     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1352     rrd_value_t *pdp_new,
1353     rrd_value_t *pdp_temp)
1354 {
1355     int       i;
1356
1357     /* update pdp_prep to the current pdp_st. */
1358     double    pre_unknown = 0.0;
1359     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1360     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1361
1362     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1363
1364     rpnstack_init(&rpnstack);
1365
1366
1367     if (isnan(pdp_new[ds_idx])) {
1368         /* a final bit of unknown to be added before calculation
1369            we use a temporary variable for this so that we
1370            don't have to turn integer lines before using the value */
1371         pre_unknown = pre_int;
1372     } else {
1373         if (isnan(scratch[PDP_val].u_val)) {
1374             scratch[PDP_val].u_val = 0;
1375         }
1376         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1377     }
1378
1379     /* if too much of the pdp_prep is unknown we dump it */
1380     /* if the interval is larger thatn mrhb we get NAN */
1381     if ((interval > mrhb) ||
1382         (rrd->stat_head->pdp_step / 2.0 <
1383          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1384         pdp_temp[ds_idx] = DNAN;
1385     } else {
1386         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1387             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1388              pre_unknown);
1389     }
1390
1391     /* process CDEF data sources; remember each CDEF DS can
1392      * only reference other DS with a lower index number */
1393     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1394         rpnp_t   *rpnp;
1395
1396         rpnp =
1397             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1398         /* substitute data values for OP_VARIABLE nodes */
1399         for (i = 0; rpnp[i].op != OP_END; i++) {
1400             if (rpnp[i].op == OP_VARIABLE) {
1401                 rpnp[i].op = OP_NUMBER;
1402                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1403             }
1404         }
1405         /* run the rpn calculator */
1406         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1407             free(rpnp);
1408             rpnstack_free(&rpnstack);
1409             return -1;
1410         }
1411     }
1412
1413     /* make pdp_prep ready for the next run */
1414     if (isnan(pdp_new[ds_idx])) {
1415         /* this is not realy accurate if we use subsecond data arival time
1416            should have thought of it when going subsecond resolution ...
1417            sorry next format change we will have it! */
1418         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1419         scratch[PDP_val].u_val = DNAN;
1420     } else {
1421         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1422         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1423     }
1424     rpnstack_free(&rpnstack);
1425     return 0;
1426 }
1427
1428 /*
1429  * Iterate over all the RRAs for a given DS and:
1430  * 1. Decide whether to schedule a smooth later
1431  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1432  * 3. Update the CDP
1433  *
1434  * Returns 0 on success, -1 on error
1435  */
1436 static int update_all_cdp_prep(
1437     rrd_t *rrd,
1438     unsigned long *rra_step_cnt,
1439     unsigned long rra_begin,
1440     rrd_file_t *rrd_file,
1441     unsigned long elapsed_pdp_st,
1442     unsigned long proc_pdp_cnt,
1443     rrd_value_t **last_seasonal_coef,
1444     rrd_value_t **seasonal_coef,
1445     rrd_value_t *pdp_temp,
1446     unsigned long *rra_current,
1447     unsigned long *skip_update,
1448     int *schedule_smooth)
1449 {
1450     unsigned long rra_idx;
1451
1452     /* index into the CDP scratch array */
1453     enum cf_en current_cf;
1454     unsigned long rra_start;
1455
1456     /* number of rows to be updated in an RRA for a data value. */
1457     unsigned long start_pdp_offset;
1458
1459     rra_start = rra_begin;
1460     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1461         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1462         start_pdp_offset =
1463             rrd->rra_def[rra_idx].pdp_cnt -
1464             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1465         skip_update[rra_idx] = 0;
1466         if (start_pdp_offset <= elapsed_pdp_st) {
1467             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1468                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1469         } else {
1470             rra_step_cnt[rra_idx] = 0;
1471         }
1472
1473         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1474             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1475              * so that they will be correct for the next observed value; note that for
1476              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1477              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1478             if (rra_step_cnt[rra_idx] > 1) {
1479                 skip_update[rra_idx] = 1;
1480                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1481                                 elapsed_pdp_st, last_seasonal_coef);
1482                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1483                                 elapsed_pdp_st + 1, seasonal_coef);
1484             }
1485             /* periodically run a smoother for seasonal effects */
1486             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1487 #ifdef DEBUG
1488                 fprintf(stderr,
1489                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1490                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1491                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1492                         u_cnt);
1493 #endif
1494                 *schedule_smooth = 1;
1495             }
1496             *rra_current = rrd_tell(rrd_file);
1497         }
1498         if (rrd_test_error())
1499             return -1;
1500
1501         if (update_cdp_prep
1502             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1503              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1504              current_cf) == -1) {
1505             return -1;
1506         }
1507         rra_start +=
1508             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1509             sizeof(rrd_value_t);
1510     }
1511     return 0;
1512 }
1513
1514 /* 
1515  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1516  */
1517 static int do_schedule_smooth(
1518     rrd_t *rrd,
1519     unsigned long rra_idx,
1520     unsigned long elapsed_pdp_st)
1521 {
1522     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1523     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1524     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1525     unsigned long seasonal_smooth_idx =
1526         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1527     unsigned long *init_seasonal =
1528         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1529
1530     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1531      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1532      * really an RRA level, not a data source within RRA level parameter, but
1533      * the rra_def is read only for rrd_update (not flushed to disk). */
1534     if (*init_seasonal > BURNIN_CYCLES) {
1535         /* someone has no doubt invented a trick to deal with this wrap around,
1536          * but at least this code is clear. */
1537         if (seasonal_smooth_idx > cur_row) {
1538             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1539              * between PDP and CDP */
1540             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1541         }
1542         /* can't rely on negative numbers because we are working with
1543          * unsigned values */
1544         return (cur_row + elapsed_pdp_st >= row_cnt
1545                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1546     }
1547     /* mark off one of the burn-in cycles */
1548     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1549 }
1550
1551 /*
1552  * For a given RRA, iterate over the data sources and call the appropriate
1553  * consolidation function.
1554  *
1555  * Returns 0 on success, -1 on error.
1556  */
1557 static int update_cdp_prep(
1558     rrd_t *rrd,
1559     unsigned long elapsed_pdp_st,
1560     unsigned long start_pdp_offset,
1561     unsigned long *rra_step_cnt,
1562     int rra_idx,
1563     rrd_value_t *pdp_temp,
1564     rrd_value_t *last_seasonal_coef,
1565     rrd_value_t *seasonal_coef,
1566     int current_cf)
1567 {
1568     unsigned long ds_idx, cdp_idx;
1569
1570     /* update CDP_PREP areas */
1571     /* loop over data soures within each RRA */
1572     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1573
1574         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1575
1576         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1577             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1578                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1579                        elapsed_pdp_st, start_pdp_offset,
1580                        rrd->rra_def[rra_idx].pdp_cnt,
1581                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1582                        rra_idx, ds_idx);
1583         } else {
1584             /* Nothing to consolidate if there's one PDP per CDP. However, if
1585              * we've missed some PDPs, let's update null counters etc. */
1586             if (elapsed_pdp_st > 2) {
1587                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1588                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1589                           current_cf);
1590             }
1591         }
1592
1593         if (rrd_test_error())
1594             return -1;
1595     }                   /* endif data sources loop */
1596     return 0;
1597 }
1598
1599 /*
1600  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1601  * primary value, secondary value, and # of unknowns.
1602  */
1603 static void update_cdp(
1604     unival *scratch,
1605     int current_cf,
1606     rrd_value_t pdp_temp_val,
1607     unsigned long rra_step_cnt,
1608     unsigned long elapsed_pdp_st,
1609     unsigned long start_pdp_offset,
1610     unsigned long pdp_cnt,
1611     rrd_value_t xff,
1612     int i,
1613     int ii)
1614 {
1615     /* shorthand variables */
1616     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1617     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1618     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1619     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1620
1621     if (rra_step_cnt) {
1622         /* If we are in this block, as least 1 CDP value will be written to
1623          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1624          * to be written, then the "fill in" value is the CDP_secondary_val
1625          * entry. */
1626         if (isnan(pdp_temp_val)) {
1627             *cdp_unkn_pdp_cnt += start_pdp_offset;
1628             *cdp_secondary_val = DNAN;
1629         } else {
1630             /* CDP_secondary value is the RRA "fill in" value for intermediary
1631              * CDP data entries. No matter the CF, the value is the same because
1632              * the average, max, min, and last of a list of identical values is
1633              * the same, namely, the value itself. */
1634             *cdp_secondary_val = pdp_temp_val;
1635         }
1636
1637         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1638             *cdp_primary_val = DNAN;
1639             if (current_cf == CF_AVERAGE) {
1640                 *cdp_val =
1641                     initialize_average_carry_over(pdp_temp_val,
1642                                                   elapsed_pdp_st,
1643                                                   start_pdp_offset, pdp_cnt);
1644             } else {
1645                 *cdp_val = pdp_temp_val;
1646             }
1647         } else {
1648             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1649                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1650         }               /* endif meets xff value requirement for a valid value */
1651         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1652          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1653         if (isnan(pdp_temp_val))
1654             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1655         else
1656             *cdp_unkn_pdp_cnt = 0;
1657     } else {            /* rra_step_cnt[i]  == 0 */
1658
1659 #ifdef DEBUG
1660         if (isnan(*cdp_val)) {
1661             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1662                     i, ii);
1663         } else {
1664             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1665                     i, ii, *cdp_val);
1666         }
1667 #endif
1668         if (isnan(pdp_temp_val)) {
1669             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1670         } else {
1671             *cdp_val =
1672                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1673                                   current_cf, i, ii);
1674         }
1675     }
1676 }
1677
1678 /*
1679  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1680  * on the type of consolidation function.
1681  */
1682 static void initialize_cdp_val(
1683     unival *scratch,
1684     int current_cf,
1685     rrd_value_t pdp_temp_val,
1686     unsigned long elapsed_pdp_st,
1687     unsigned long start_pdp_offset,
1688     unsigned long pdp_cnt)
1689 {
1690     rrd_value_t cum_val, cur_val;
1691
1692     switch (current_cf) {
1693     case CF_AVERAGE:
1694         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1695         cur_val = IFDNAN(pdp_temp_val, 0.0);
1696         scratch[CDP_primary_val].u_val =
1697             (cum_val + cur_val * start_pdp_offset) /
1698             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1699         scratch[CDP_val].u_val =
1700             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1701                                           start_pdp_offset, pdp_cnt);
1702         break;
1703     case CF_MAXIMUM:
1704         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1705         cur_val = IFDNAN(pdp_temp_val, -DINF);
1706 #if 0
1707 #ifdef DEBUG
1708         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1709             fprintf(stderr,
1710                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1711                     i, ii);
1712             exit(-1);
1713         }
1714 #endif
1715 #endif
1716         if (cur_val > cum_val)
1717             scratch[CDP_primary_val].u_val = cur_val;
1718         else
1719             scratch[CDP_primary_val].u_val = cum_val;
1720         /* initialize carry over value */
1721         scratch[CDP_val].u_val = pdp_temp_val;
1722         break;
1723     case CF_MINIMUM:
1724         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1725         cur_val = IFDNAN(pdp_temp_val, DINF);
1726 #if 0
1727 #ifdef DEBUG
1728         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1729             fprintf(stderr,
1730                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1731                     ii);
1732             exit(-1);
1733         }
1734 #endif
1735 #endif
1736         if (cur_val < cum_val)
1737             scratch[CDP_primary_val].u_val = cur_val;
1738         else
1739             scratch[CDP_primary_val].u_val = cum_val;
1740         /* initialize carry over value */
1741         scratch[CDP_val].u_val = pdp_temp_val;
1742         break;
1743     case CF_LAST:
1744     default:
1745         scratch[CDP_primary_val].u_val = pdp_temp_val;
1746         /* initialize carry over value */
1747         scratch[CDP_val].u_val = pdp_temp_val;
1748         break;
1749     }
1750 }
1751
1752 /*
1753  * Update the consolidation function for Holt-Winters functions as
1754  * well as other functions that don't actually consolidate multiple
1755  * PDPs.
1756  */
1757 static void reset_cdp(
1758     rrd_t *rrd,
1759     unsigned long elapsed_pdp_st,
1760     rrd_value_t *pdp_temp,
1761     rrd_value_t *last_seasonal_coef,
1762     rrd_value_t *seasonal_coef,
1763     int rra_idx,
1764     int ds_idx,
1765     int cdp_idx,
1766     enum cf_en current_cf)
1767 {
1768     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1769
1770     switch (current_cf) {
1771     case CF_AVERAGE:
1772     default:
1773         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1774         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1775         break;
1776     case CF_SEASONAL:
1777     case CF_DEVSEASONAL:
1778         /* need to update cached seasonal values, so they are consistent
1779          * with the bulk update */
1780         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1781          * CDP_last_deviation are the same. */
1782         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1783         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1784         break;
1785     case CF_HWPREDICT:
1786     case CF_MHWPREDICT:
1787         /* need to update the null_count and last_null_count.
1788          * even do this for non-DNAN pdp_temp because the
1789          * algorithm is not learning from batch updates. */
1790         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1791         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1792         /* fall through */
1793     case CF_DEVPREDICT:
1794         scratch[CDP_primary_val].u_val = DNAN;
1795         scratch[CDP_secondary_val].u_val = DNAN;
1796         break;
1797     case CF_FAILURES:
1798         /* do not count missed bulk values as failures */
1799         scratch[CDP_primary_val].u_val = 0;
1800         scratch[CDP_secondary_val].u_val = 0;
1801         /* need to reset violations buffer.
1802          * could do this more carefully, but for now, just
1803          * assume a bulk update wipes away all violations. */
1804         erase_violations(rrd, cdp_idx, rra_idx);
1805         break;
1806     }
1807 }
1808
1809 static rrd_value_t initialize_average_carry_over(
1810     rrd_value_t pdp_temp_val,
1811     unsigned long elapsed_pdp_st,
1812     unsigned long start_pdp_offset,
1813     unsigned long pdp_cnt)
1814 {
1815     /* initialize carry over value */
1816     if (isnan(pdp_temp_val)) {
1817         return DNAN;
1818     }
1819     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1820 }
1821
1822 /*
1823  * Update or initialize a CDP value based on the consolidation
1824  * function.
1825  *
1826  * Returns the new value.
1827  */
1828 static rrd_value_t calculate_cdp_val(
1829     rrd_value_t cdp_val,
1830     rrd_value_t pdp_temp_val,
1831     unsigned long elapsed_pdp_st,
1832     int current_cf,
1833 #ifdef DEBUG
1834     int i,
1835     int ii
1836 #else
1837     int UNUSED(i),
1838     int UNUSED(ii)
1839 #endif
1840     )
1841 {
1842     if (isnan(cdp_val)) {
1843         if (current_cf == CF_AVERAGE) {
1844             pdp_temp_val *= elapsed_pdp_st;
1845         }
1846 #ifdef DEBUG
1847         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1848                 i, ii, pdp_temp_val);
1849 #endif
1850         return pdp_temp_val;
1851     }
1852     if (current_cf == CF_AVERAGE)
1853         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1854     if (current_cf == CF_MINIMUM)
1855         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1856     if (current_cf == CF_MAXIMUM)
1857         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1858
1859     return pdp_temp_val;
1860 }
1861
1862 /*
1863  * For each RRA, update the seasonal values and then call update_aberrant_CF
1864  * for each data source.
1865  *
1866  * Return 0 on success, -1 on error.
1867  */
1868 static int update_aberrant_cdps(
1869     rrd_t *rrd,
1870     rrd_file_t *rrd_file,
1871     unsigned long rra_begin,
1872     unsigned long *rra_current,
1873     unsigned long elapsed_pdp_st,
1874     rrd_value_t *pdp_temp,
1875     rrd_value_t **seasonal_coef)
1876 {
1877     unsigned long rra_idx, ds_idx, j;
1878
1879     /* number of PDP steps since the last update that
1880      * are assigned to the first CDP to be generated
1881      * since the last update. */
1882     unsigned short scratch_idx;
1883     unsigned long rra_start;
1884     enum cf_en current_cf;
1885
1886     /* this loop is only entered if elapsed_pdp_st < 3 */
1887     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1888          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1889         rra_start = rra_begin;
1890         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1891             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1892                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1893                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1894                     if (scratch_idx == CDP_primary_val) {
1895                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1896                                         elapsed_pdp_st + 1, seasonal_coef);
1897                     } else {
1898                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1899                                         elapsed_pdp_st + 2, seasonal_coef);
1900                     }
1901                     *rra_current = rrd_tell(rrd_file);
1902                 }
1903                 if (rrd_test_error())
1904                     return -1;
1905                 /* loop over data soures within each RRA */
1906                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1907                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1908                                        rra_idx * (rrd->stat_head->ds_cnt) +
1909                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1910                                        *seasonal_coef);
1911                 }
1912             }
1913             rra_start += rrd->rra_def[rra_idx].row_cnt
1914                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1915         }
1916     }
1917     return 0;
1918 }
1919
1920 /* 
1921  * Move sequentially through the file, writing one RRA at a time.  Note this
1922  * architecture divorces the computation of CDP with flushing updated RRA
1923  * entries to disk.
1924  *
1925  * Return 0 on success, -1 on error.
1926  */
1927 static int write_to_rras(
1928     rrd_t *rrd,
1929     rrd_file_t *rrd_file,
1930     unsigned long *rra_step_cnt,
1931     unsigned long rra_begin,
1932     unsigned long *rra_current,
1933     time_t current_time,
1934     unsigned long *skip_update,
1935     rrd_info_t ** pcdp_summary)
1936 {
1937     unsigned long rra_idx;
1938     unsigned long rra_start;
1939     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1940     time_t    rra_time = 0; /* time of update for a RRA */
1941
1942     /* Ready to write to disk */
1943     rra_start = rra_begin;
1944     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1945         /* skip unless there's something to write */
1946         if (rra_step_cnt[rra_idx]) {
1947             /* write the first row */
1948 #ifdef DEBUG
1949             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1950 #endif
1951             rrd->rra_ptr[rra_idx].cur_row++;
1952             if (rrd->rra_ptr[rra_idx].cur_row >=
1953                 rrd->rra_def[rra_idx].row_cnt)
1954                 rrd->rra_ptr[rra_idx].cur_row = 0;  /* wrap around */
1955             /* position on the first row */
1956             rra_pos_tmp = rra_start +
1957                 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1958                 sizeof(rrd_value_t);
1959             if (rra_pos_tmp != *rra_current) {
1960                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1961                     rrd_set_error("seek error in rrd");
1962                     return -1;
1963                 }
1964                 *rra_current = rra_pos_tmp;
1965             }
1966 #ifdef DEBUG
1967             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1968 #endif
1969             if (!skip_update[rra_idx]) {
1970                 if (*pcdp_summary != NULL) {
1971                     rra_time = (current_time - current_time
1972                                 % (rrd->rra_def[rra_idx].pdp_cnt *
1973                                    rrd->stat_head->pdp_step))
1974                         -
1975                         ((rra_step_cnt[rra_idx] -
1976                           1) * rrd->rra_def[rra_idx].pdp_cnt *
1977                          rrd->stat_head->pdp_step);
1978                 }
1979                 if (write_RRA_row
1980                     (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1981                      pcdp_summary, rra_time) == -1)
1982                     return -1;
1983             }
1984
1985             /* write other rows of the bulk update, if any */
1986             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1987                 if (++rrd->rra_ptr[rra_idx].cur_row ==
1988                     rrd->rra_def[rra_idx].row_cnt) {
1989 #ifdef DEBUG
1990                     fprintf(stderr,
1991                             "Wraparound for RRA %s, %lu updates left\n",
1992                             rrd->rra_def[rra_idx].cf_nam,
1993                             rra_step_cnt[rra_idx] - 1);
1994 #endif
1995                     /* wrap */
1996                     rrd->rra_ptr[rra_idx].cur_row = 0;
1997                     /* seek back to beginning of current rra */
1998                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1999                         rrd_set_error("seek error in rrd");
2000                         return -1;
2001                     }
2002 #ifdef DEBUG
2003                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
2004                             rrd_file->pos);
2005 #endif
2006                     *rra_current = rra_start;
2007                 }
2008                 if (!skip_update[rra_idx]) {
2009                     if (*pcdp_summary != NULL) {
2010                         rra_time = (current_time - current_time
2011                                     % (rrd->rra_def[rra_idx].pdp_cnt *
2012                                        rrd->stat_head->pdp_step))
2013                             -
2014                             ((rra_step_cnt[rra_idx] -
2015                               2) * rrd->rra_def[rra_idx].pdp_cnt *
2016                              rrd->stat_head->pdp_step);
2017                     }
2018                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
2019                                       CDP_secondary_val, pcdp_summary,
2020                                       rra_time) == -1)
2021                         return -1;
2022                 }
2023             }
2024         }
2025         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
2026             sizeof(rrd_value_t);
2027     }                   /* RRA LOOP */
2028
2029     return 0;
2030 }
2031
2032 /*
2033  * Write out one row of values (one value per DS) to the archive.
2034  *
2035  * Returns 0 on success, -1 on error.
2036  */
2037 static int write_RRA_row(
2038     rrd_file_t *rrd_file,
2039     rrd_t *rrd,
2040     unsigned long rra_idx,
2041     unsigned long *rra_current,
2042     unsigned short CDP_scratch_idx,
2043     rrd_info_t ** pcdp_summary,
2044     time_t rra_time)
2045 {
2046     unsigned long ds_idx, cdp_idx;
2047     rrd_infoval_t iv;
2048
2049     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2050         /* compute the cdp index */
2051         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2052 #ifdef DEBUG
2053         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2054                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2055                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2056 #endif
2057         if (*pcdp_summary != NULL) {
2058             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2059             /* append info to the return hash */
2060             *pcdp_summary = rrd_info_push(*pcdp_summary,
2061                                           sprintf_alloc
2062                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2063                                            rrd->rra_def[rra_idx].cf_nam,
2064                                            rrd->rra_def[rra_idx].pdp_cnt,
2065                                            rrd->ds_def[ds_idx].ds_nam),
2066                                           RD_I_VAL, iv);
2067         }
2068         if (rrd_write(rrd_file,
2069                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2070                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2071             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2072             return -1;
2073         }
2074         *rra_current += sizeof(rrd_value_t);
2075     }
2076     return 0;
2077 }
2078
2079 /*
2080  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2081  *
2082  * Returns 0 on success, -1 otherwise
2083  */
2084 static int smooth_all_rras(
2085     rrd_t *rrd,
2086     rrd_file_t *rrd_file,
2087     unsigned long rra_begin)
2088 {
2089     unsigned long rra_start = rra_begin;
2090     unsigned long rra_idx;
2091
2092     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2093         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2094             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2095 #ifdef DEBUG
2096             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2097 #endif
2098             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2099             if (rrd_test_error())
2100                 return -1;
2101         }
2102         rra_start += rrd->rra_def[rra_idx].row_cnt
2103             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2104     }
2105     return 0;
2106 }
2107
2108 #ifndef HAVE_MMAP
2109 /*
2110  * Flush changes to disk (unless we're using mmap)
2111  *
2112  * Returns 0 on success, -1 otherwise
2113  */
2114 static int write_changes_to_disk(
2115     rrd_t *rrd,
2116     rrd_file_t *rrd_file,
2117     int version)
2118 {
2119     /* we just need to write back the live header portion now */
2120     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2121                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2122                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2123                  SEEK_SET) != 0) {
2124         rrd_set_error("seek rrd for live header writeback");
2125         return -1;
2126     }
2127     if (version >= 3) {
2128         if (rrd_write(rrd_file, rrd->live_head,
2129                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2130             rrd_set_error("rrd_write live_head to rrd");
2131             return -1;
2132         }
2133     } else {
2134         if (rrd_write(rrd_file, rrd->legacy_last_up,
2135                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2136             rrd_set_error("rrd_write live_head to rrd");
2137             return -1;
2138         }
2139     }
2140
2141
2142     if (rrd_write(rrd_file, rrd->pdp_prep,
2143                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2144         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2145         rrd_set_error("rrd_write pdp_prep to rrd");
2146         return -1;
2147     }
2148
2149     if (rrd_write(rrd_file, rrd->cdp_prep,
2150                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2151                   rrd->stat_head->ds_cnt)
2152         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2153                       rrd->stat_head->ds_cnt)) {
2154
2155         rrd_set_error("rrd_write cdp_prep to rrd");
2156         return -1;
2157     }
2158
2159     if (rrd_write(rrd_file, rrd->rra_ptr,
2160                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2161         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2162         rrd_set_error("rrd_write rra_ptr to rrd");
2163         return -1;
2164     }
2165     return 0;
2166 }
2167 #endif