Fix for HoltWinters phase-shift bug described below.
authoroetiker <oetiker@a5681a0c-68f1-0310-ab6d-d61299d08faa>
Tue, 14 Aug 2007 21:59:31 +0000 (21:59 +0000)
committeroetiker <oetiker@a5681a0c-68f1-0310-ab6d-d61299d08faa>
Tue, 14 Aug 2007 21:59:31 +0000 (21:59 +0000)
When one or more primary data point times were missed, the SEASONAL and
DEVSEASONAL archives were marked as being up-to-date, so that they would not
be written to. It was correct not to write to these archives, but the code
failed to advance the pointers within the SEASONAL and DEVSEASONAL archives
so that future updates would go to the correct location in the archives.

Rather than mark these archives as up-to-date (by setting
rra_step_cnt[rra_idx] = 0), my patch allocates a new "skip_update" array
that is set to 1 for SEASONAL and DEVSEASONAL archives that have missed one
or more primary data points. When an RRA is written to, the cur_row pointer
advancement happens for all archives, but the skip_update array is checked
just before actually writing out the changes.

Please give it a whirl!
-- Evan Miller emiller imvu.com

git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk/program@1192 a5681a0c-68f1-0310-ab6d-d61299d08faa

src/rrd_update.c

index 45f635b..bce9c16 100644 (file)
@@ -73,9 +73,9 @@ int       _rrd_update(
     info_t *);
 
 static int allocate_data_structures(
-    rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp,
-    const char *tmplt, long **tmpl_idx, unsigned long *tmpl_cnt, 
-    unsigned long **rra_step_cnt, rrd_value_t **pdp_new);
+    rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp, const char *tmplt, 
+    long **tmpl_idx, unsigned long *tmpl_cnt, unsigned long **rra_step_cnt, 
+    unsigned long **skip_update, rrd_value_t **pdp_new);
 
 static int parse_template(rrd_t *rrd, const char *tmplt,
     unsigned long *tmpl_cnt, long *tmpl_idx);
@@ -96,6 +96,7 @@ static int process_arg(
     unsigned long  tmpl_cnt,
     info_t       **pcdp_summary,
     int            version,
+    unsigned long *skip_update,
     int           *schedule_smooth);
 
 static int parse_ds(rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
@@ -127,7 +128,8 @@ static int update_all_cdp_prep(
     rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin, 
     rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
     rrd_value_t **last_seasonal_coef, rrd_value_t **seasonal_coef,
-    rrd_value_t *pdp_temp, unsigned long *rra_current, int *schedule_smooth);
+    rrd_value_t *pdp_temp, unsigned long *rra_current, 
+    unsigned long *skip_update, int *schedule_smooth);
 
 static int do_schedule_smooth(rrd_t *rrd, unsigned long rra_idx, 
     unsigned long elapsed_pdp_st);
@@ -165,11 +167,12 @@ static int update_aberrant_cdps(rrd_t *rrd, rrd_file_t *rrd_file,
 
 static int write_to_rras(rrd_t *rrd, rrd_file_t *rrd_file, 
     unsigned long *rra_step_cnt, unsigned long rra_begin, 
-    unsigned long *rra_current, time_t current_time, info_t **pcdp_summary);
+    unsigned long *rra_current, time_t current_time, 
+    unsigned long *skip_update, info_t **pcdp_summary);
 
 static int write_RRA_row(rrd_file_t *rrd_file, rrd_t *rrd, unsigned long rra_idx,
     unsigned long *rra_current, unsigned short CDP_scratch_idx, info_t **pcdp_summary,
-    time_t *rra_time);
+    time_t rra_time);
 
 static int smooth_all_rras(rrd_t *rrd, rrd_file_t *rrd_file, 
     unsigned long rra_begin);
@@ -355,6 +358,7 @@ int _rrd_update(
     int            version;     /* rrd version */
     rrd_file_t    *rrd_file;
     char          *arg_copy;    /* for processing the argv */
+    unsigned long *skip_update; /* RRAs to advance but not write */
 
     /* need at least 1 arguments: data. */
     if (argc < 1) {
@@ -382,7 +386,7 @@ int _rrd_update(
 
     if (allocate_data_structures(&rrd, &updvals, 
                     &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
-                    &rra_step_cnt, &pdp_new) == -1) {
+                    &rra_step_cnt, &skip_update, &pdp_new) == -1) {
         goto err_close;
     }
 
@@ -395,7 +399,7 @@ int _rrd_update(
         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current, 
                         &current_time, &current_time_usec, pdp_temp, pdp_new,
                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt, &pcdp_summary, 
-                        version, &schedule_smooth) == -1) {
+                        version, skip_update, &schedule_smooth) == -1) {
             free(arg_copy);
             break;
         }
@@ -431,6 +435,7 @@ int _rrd_update(
     free(pdp_new);
     free(tmpl_idx);
     free(pdp_temp);
+    free(skip_update);
     free(updvals);
     return 0;
 
@@ -438,6 +443,7 @@ int _rrd_update(
     free(pdp_new);
     free(tmpl_idx);
     free(pdp_temp);
+    free(skip_update);
     free(updvals);
   err_close:
     rrd_close(rrd_file);
@@ -491,9 +497,15 @@ int LockRRD(
  * Returns 0 on success, -1 on error.
  */
 static int allocate_data_structures(
-    rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp, const char *tmplt, 
-    long **tmpl_idx, unsigned long *tmpl_cnt, unsigned long **rra_step_cnt, 
-    rrd_value_t **pdp_new) 
+    rrd_t         *rrd, 
+    char        ***updvals, 
+    rrd_value_t  **pdp_temp, 
+    const char    *tmplt, 
+    long         **tmpl_idx, 
+    unsigned long *tmpl_cnt, 
+    unsigned long **rra_step_cnt, 
+    unsigned long **skip_update,
+    rrd_value_t  **pdp_new) 
 {
     unsigned i, ii;
     if ((*updvals = (char **)malloc(sizeof(char *) 
@@ -501,17 +513,20 @@ static int allocate_data_structures(
         rrd_set_error("allocating updvals pointer array");
         return -1;
     }
-
     if ((*pdp_temp = (rrd_value_t *)malloc(sizeof(rrd_value_t)
                            * rrd->stat_head->ds_cnt)) == NULL) {
         rrd_set_error("allocating pdp_temp ...");
         goto err_free_updvals;
     }
-
+    if ((*skip_update = (unsigned long *)malloc(sizeof(unsigned long)
+                           * rrd->stat_head->rra_cnt)) == NULL) {
+        rrd_set_error("allocating skip_update...");
+        goto err_free_pdp_temp;
+    }
     if ((*tmpl_idx = (long *)malloc(sizeof(unsigned long)
                            * (rrd->stat_head->ds_cnt + 1))) == NULL) {
         rrd_set_error("allocating tmpl_idx ...");
-        goto err_free_pdp_temp;
+        goto err_free_skip_update;
     }
     if ((*rra_step_cnt = (unsigned long *)malloc(sizeof(unsigned long) 
                            * (rrd->stat_head->rra_cnt))) == NULL) {
@@ -534,20 +549,24 @@ static int allocate_data_structures(
 
     if (tmplt != NULL) {
         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
-            goto err_free_tmpl_idx;
+            goto err_free_rra_step_cnt;
         }
     }
 
     if ((*pdp_new = (rrd_value_t *)malloc(sizeof(rrd_value_t)
                           * rrd->stat_head->ds_cnt)) == NULL) {
         rrd_set_error("allocating pdp_new ...");
-        goto err_free_tmpl_idx;
+        goto err_free_rra_step_cnt;
     }
 
     return 0;
 
+err_free_rra_step_cnt:
+    free(*rra_step_cnt);
 err_free_tmpl_idx:
     free(*tmpl_idx);
+err_free_skip_update:
+    free(*skip_update);
 err_free_pdp_temp:
     free(*pdp_temp);
 err_free_updvals:
@@ -621,6 +640,7 @@ static int process_arg(
     unsigned long  tmpl_cnt,
     info_t       **pcdp_summary,
     int            version,
+    unsigned long *skip_update,
     int           *schedule_smooth)
 {
     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
@@ -684,7 +704,7 @@ static int process_arg(
                                 &last_seasonal_coef,
                                 &seasonal_coef,
                                 pdp_temp, rra_current,
-                                schedule_smooth) == -1)
+                                skip_update, schedule_smooth) == -1)
         {
             goto err_free_coefficients;
         }
@@ -693,9 +713,8 @@ static int process_arg(
         {
             goto err_free_coefficients;
         }
-        if (write_to_rras(rrd, rrd_file, 
-                rra_step_cnt, rra_begin, rra_current, 
-                *current_time, pcdp_summary) == -1)
+        if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin, 
+                rra_current, *current_time, skip_update, pcdp_summary) == -1)
         {
             goto err_free_coefficients;
         }
@@ -984,17 +1003,12 @@ static int calculate_elapsed_steps(
     double *post_int,
     unsigned long *proc_pdp_cnt) 
 {
-
-    unsigned long proc_pdp_st;  /* which pdp_st was the last
-                                 * to be processed */
-    unsigned long occu_pdp_st;  /* when was the pdp_st
-                                 * before the last update
+    unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
+    unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
                                  * time */
-    unsigned long proc_pdp_age; /* how old was the data in
-                                 * the pdp prep area when it
-                                 * was last updated */
-    unsigned long occu_pdp_age; /* how long ago was the last
-                                 * pdp_step time */
+    unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
+                                 * when it was last updated */
+    unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
 
     /* when was the current pdp started */
     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
@@ -1182,7 +1196,7 @@ static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval,
 /*
  * Iterate over all the RRAs for a given DS and:
  * 1. Decide whether to schedule a smooth later
- * 2. Shift the seasonal array if it's a bulk update
+ * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
  * 3. Update the CDP
  *
  * Returns 0 on success, -1 on error
@@ -1191,7 +1205,8 @@ static int update_all_cdp_prep(
     rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin, 
     rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
     rrd_value_t  **last_seasonal_coef, rrd_value_t  **seasonal_coef,
-    rrd_value_t *pdp_temp, unsigned long *rra_current, int *schedule_smooth) 
+    rrd_value_t *pdp_temp, unsigned long *rra_current, 
+    unsigned long *skip_update, int *schedule_smooth)
 {
     unsigned long rra_idx;
     /* index into the CDP scratch array */
@@ -1204,6 +1219,7 @@ static int update_all_cdp_prep(
     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
         start_pdp_offset = rrd->rra_def[rra_idx].pdp_cnt - proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
+        skip_update[rra_idx] = 0;
         if (start_pdp_offset <= elapsed_pdp_st) {
             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
                     rrd->rra_def[rra_idx].pdp_cnt + 1;
@@ -1216,11 +1232,8 @@ static int update_all_cdp_prep(
              * so that they will be correct for the next observed value; note that for
              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
-            if (rra_step_cnt[rra_idx] > 2) {
-                /* skip update by resetting rra_step_cnt[rra_idx], note that this is not data
-                 * source specific; this is due to the bulk update, not a DNAN value
-                 * for the specific data source. */
-                rra_step_cnt[rra_idx] = 0;
+            if (rra_step_cnt[rra_idx] > 1) {
+                skip_update[rra_idx] = 1;
                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
                                 elapsed_pdp_st, last_seasonal_coef);
                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
@@ -1237,7 +1250,6 @@ static int update_all_cdp_prep(
             }
             *rra_current = rrd_tell(rrd_file);
         }
-        /* if cf is DEVSEASONAL or SEASONAL */
         if (rrd_test_error())
             return -1;
 
@@ -1644,15 +1656,12 @@ static int write_to_rras(
     unsigned long rra_begin, 
     unsigned long *rra_current,
     time_t current_time,
+    unsigned long *skip_update,
     info_t **pcdp_summary)
 {
     unsigned long rra_idx;
     unsigned long rra_start;    
     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
-    /* number of PDP steps since the last update that
-     * are assigned to the first CDP to be generated
-     * since the last update. */
-    unsigned short scratch_idx;
     time_t    rra_time = 0; /* time of update for a RRA */
 
     /* Ready to write to disk */
@@ -1667,7 +1676,7 @@ static int write_to_rras(
             rrd->rra_ptr[rra_idx].cur_row++;
             if (rrd->rra_ptr[rra_idx].cur_row >= rrd->rra_def[rra_idx].row_cnt)
                 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
-            /* positition on the first row */
+            /* position on the first row */
             rra_pos_tmp = rra_start +
                     (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
                     sizeof(rrd_value_t);
@@ -1681,22 +1690,20 @@ static int write_to_rras(
 #ifdef DEBUG
             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
 #endif
-            scratch_idx = CDP_primary_val;
-            if (*pcdp_summary != NULL) {
-                rra_time = (current_time - current_time
-                                % (rrd->rra_def[rra_idx].pdp_cnt *
-                                        rrd->stat_head->pdp_step))
-                        - ((rra_step_cnt[rra_idx] - 1) * rrd->rra_def[rra_idx].pdp_cnt *
-                                        rrd->stat_head->pdp_step);
+            if (!skip_update[rra_idx]) {
+                if (*pcdp_summary != NULL) {
+                    rra_time = (current_time - current_time
+                                    % (rrd->rra_def[rra_idx].pdp_cnt *
+                                            rrd->stat_head->pdp_step))
+                            - ((rra_step_cnt[rra_idx] - 1) * rrd->rra_def[rra_idx].pdp_cnt *
+                                            rrd->stat_head->pdp_step);
+                }
+                if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
+                                        pcdp_summary, rra_time) == -1)
+                    return -1;
             }
-            if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, scratch_idx, 
-                                    pcdp_summary, &rra_time) == -1)
-                return -1;
-            if (rrd_test_error())
-                return -1;
 
             /* write other rows of the bulk update, if any */
-            scratch_idx = CDP_secondary_val;
             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
                 if (++rrd->rra_ptr[rra_idx].cur_row == rrd->rra_def[rra_idx].row_cnt) {
 #ifdef DEBUG
@@ -1717,21 +1724,20 @@ static int write_to_rras(
 #endif
                     *rra_current = rra_start;
                 }
-                if (*pcdp_summary != NULL) {
-                    rra_time = (current_time - current_time
-                                    % (rrd->rra_def[rra_idx].pdp_cnt *
-                                            rrd->stat_head->pdp_step))
-                            -
-                            ((rra_step_cnt[rra_idx] - 2) * rrd->rra_def[rra_idx].pdp_cnt *
-                             rrd->stat_head->pdp_step);
+                if (!skip_update[rra_idx]) {
+                    if (*pcdp_summary != NULL) {
+                        rra_time = (current_time - current_time
+                                        % (rrd->rra_def[rra_idx].pdp_cnt *
+                                                rrd->stat_head->pdp_step))
+                                -
+                                ((rra_step_cnt[rra_idx] - 2) * rrd->rra_def[rra_idx].pdp_cnt *
+                                 rrd->stat_head->pdp_step);
+                    }
+                    if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
+                                            CDP_secondary_val, pcdp_summary, rra_time) == -1)
+                        return -1;
                 }
-                if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
-                                        scratch_idx, pcdp_summary, &rra_time) == -1)
-                    return -1;
             }
-
-            if (rrd_test_error())
-                return -1;
         }
         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
                 sizeof(rrd_value_t);
@@ -1752,7 +1758,7 @@ static int write_RRA_row(
     unsigned long *rra_current,
     unsigned short CDP_scratch_idx,
     info_t **pcdp_summary,
-    time_t *rra_time)
+    time_t rra_time)
 {
     unsigned long ds_idx, cdp_idx;
     infoval   iv;
@@ -1769,7 +1775,7 @@ static int write_RRA_row(
             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
             /* append info to the return hash */
             *pcdp_summary = info_push(*pcdp_summary,
-                     sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", *rra_time,
+                     sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", rra_time,
                            rrd->rra_def[rra_idx].cf_nam,
                            rrd->rra_def[rra_idx].pdp_cnt,
                            rrd->ds_def[ds_idx].ds_nam), RD_I_VAL, iv);