X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_update.c;h=bce9c16a7a688595130023f8a4911dff55bb05f1;hp=45f635bfaa46774f533e2453fb5f31dc17e32b30;hb=c3aa5c187612865ff1091d7f79e0cc2efa737cff;hpb=2b68c471b85ba4243c12c78f6c774a297745a177 diff --git a/src/rrd_update.c b/src/rrd_update.c index 45f635b..bce9c16 100644 --- a/src/rrd_update.c +++ b/src/rrd_update.c @@ -73,9 +73,9 @@ int _rrd_update( info_t *); static int allocate_data_structures( - rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp, - const char *tmplt, long **tmpl_idx, unsigned long *tmpl_cnt, - unsigned long **rra_step_cnt, rrd_value_t **pdp_new); + rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp, const char *tmplt, + long **tmpl_idx, unsigned long *tmpl_cnt, unsigned long **rra_step_cnt, + unsigned long **skip_update, rrd_value_t **pdp_new); static int parse_template(rrd_t *rrd, const char *tmplt, unsigned long *tmpl_cnt, long *tmpl_idx); @@ -96,6 +96,7 @@ static int process_arg( unsigned long tmpl_cnt, info_t **pcdp_summary, int version, + unsigned long *skip_update, int *schedule_smooth); static int parse_ds(rrd_t *rrd, char **updvals, long *tmpl_idx, char *input, @@ -127,7 +128,8 @@ static int update_all_cdp_prep( rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin, rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt, rrd_value_t **last_seasonal_coef, rrd_value_t **seasonal_coef, - rrd_value_t *pdp_temp, unsigned long *rra_current, int *schedule_smooth); + rrd_value_t *pdp_temp, unsigned long *rra_current, + unsigned long *skip_update, int *schedule_smooth); static int do_schedule_smooth(rrd_t *rrd, unsigned long rra_idx, unsigned long elapsed_pdp_st); @@ -165,11 +167,12 @@ static int update_aberrant_cdps(rrd_t *rrd, rrd_file_t *rrd_file, static int write_to_rras(rrd_t *rrd, rrd_file_t *rrd_file, unsigned long *rra_step_cnt, unsigned long rra_begin, - unsigned long *rra_current, time_t current_time, info_t **pcdp_summary); + unsigned long *rra_current, time_t current_time, + unsigned long *skip_update, info_t **pcdp_summary); static int write_RRA_row(rrd_file_t *rrd_file, rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current, unsigned short CDP_scratch_idx, info_t **pcdp_summary, - time_t *rra_time); + time_t rra_time); static int smooth_all_rras(rrd_t *rrd, rrd_file_t *rrd_file, unsigned long rra_begin); @@ -355,6 +358,7 @@ int _rrd_update( int version; /* rrd version */ rrd_file_t *rrd_file; char *arg_copy; /* for processing the argv */ + unsigned long *skip_update; /* RRAs to advance but not write */ /* need at least 1 arguments: data. */ if (argc < 1) { @@ -382,7 +386,7 @@ int _rrd_update( if (allocate_data_structures(&rrd, &updvals, &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt, - &rra_step_cnt, &pdp_new) == -1) { + &rra_step_cnt, &skip_update, &pdp_new) == -1) { goto err_close; } @@ -395,7 +399,7 @@ int _rrd_update( if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current, ¤t_time, ¤t_time_usec, pdp_temp, pdp_new, rra_step_cnt, updvals, tmpl_idx, tmpl_cnt, &pcdp_summary, - version, &schedule_smooth) == -1) { + version, skip_update, &schedule_smooth) == -1) { free(arg_copy); break; } @@ -431,6 +435,7 @@ int _rrd_update( free(pdp_new); free(tmpl_idx); free(pdp_temp); + free(skip_update); free(updvals); return 0; @@ -438,6 +443,7 @@ int _rrd_update( free(pdp_new); free(tmpl_idx); free(pdp_temp); + free(skip_update); free(updvals); err_close: rrd_close(rrd_file); @@ -491,9 +497,15 @@ int LockRRD( * Returns 0 on success, -1 on error. */ static int allocate_data_structures( - rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp, const char *tmplt, - long **tmpl_idx, unsigned long *tmpl_cnt, unsigned long **rra_step_cnt, - rrd_value_t **pdp_new) + rrd_t *rrd, + char ***updvals, + rrd_value_t **pdp_temp, + const char *tmplt, + long **tmpl_idx, + unsigned long *tmpl_cnt, + unsigned long **rra_step_cnt, + unsigned long **skip_update, + rrd_value_t **pdp_new) { unsigned i, ii; if ((*updvals = (char **)malloc(sizeof(char *) @@ -501,17 +513,20 @@ static int allocate_data_structures( rrd_set_error("allocating updvals pointer array"); return -1; } - if ((*pdp_temp = (rrd_value_t *)malloc(sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) == NULL) { rrd_set_error("allocating pdp_temp ..."); goto err_free_updvals; } - + if ((*skip_update = (unsigned long *)malloc(sizeof(unsigned long) + * rrd->stat_head->rra_cnt)) == NULL) { + rrd_set_error("allocating skip_update..."); + goto err_free_pdp_temp; + } if ((*tmpl_idx = (long *)malloc(sizeof(unsigned long) * (rrd->stat_head->ds_cnt + 1))) == NULL) { rrd_set_error("allocating tmpl_idx ..."); - goto err_free_pdp_temp; + goto err_free_skip_update; } if ((*rra_step_cnt = (unsigned long *)malloc(sizeof(unsigned long) * (rrd->stat_head->rra_cnt))) == NULL) { @@ -534,20 +549,24 @@ static int allocate_data_structures( if (tmplt != NULL) { if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) { - goto err_free_tmpl_idx; + goto err_free_rra_step_cnt; } } if ((*pdp_new = (rrd_value_t *)malloc(sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) == NULL) { rrd_set_error("allocating pdp_new ..."); - goto err_free_tmpl_idx; + goto err_free_rra_step_cnt; } return 0; +err_free_rra_step_cnt: + free(*rra_step_cnt); err_free_tmpl_idx: free(*tmpl_idx); +err_free_skip_update: + free(*skip_update); err_free_pdp_temp: free(*pdp_temp); err_free_updvals: @@ -621,6 +640,7 @@ static int process_arg( unsigned long tmpl_cnt, info_t **pcdp_summary, int version, + unsigned long *skip_update, int *schedule_smooth) { rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL; @@ -684,7 +704,7 @@ static int process_arg( &last_seasonal_coef, &seasonal_coef, pdp_temp, rra_current, - schedule_smooth) == -1) + skip_update, schedule_smooth) == -1) { goto err_free_coefficients; } @@ -693,9 +713,8 @@ static int process_arg( { goto err_free_coefficients; } - if (write_to_rras(rrd, rrd_file, - rra_step_cnt, rra_begin, rra_current, - *current_time, pcdp_summary) == -1) + if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin, + rra_current, *current_time, skip_update, pcdp_summary) == -1) { goto err_free_coefficients; } @@ -984,17 +1003,12 @@ static int calculate_elapsed_steps( double *post_int, unsigned long *proc_pdp_cnt) { - - unsigned long proc_pdp_st; /* which pdp_st was the last - * to be processed */ - unsigned long occu_pdp_st; /* when was the pdp_st - * before the last update + unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */ + unsigned long occu_pdp_st; /* when was the pdp_st before the last update * time */ - unsigned long proc_pdp_age; /* how old was the data in - * the pdp prep area when it - * was last updated */ - unsigned long occu_pdp_age; /* how long ago was the last - * pdp_step time */ + unsigned long proc_pdp_age; /* how old was the data in the pdp prep area + * when it was last updated */ + unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */ /* when was the current pdp started */ proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step; @@ -1182,7 +1196,7 @@ static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval, /* * Iterate over all the RRAs for a given DS and: * 1. Decide whether to schedule a smooth later - * 2. Shift the seasonal array if it's a bulk update + * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL * 3. Update the CDP * * Returns 0 on success, -1 on error @@ -1191,7 +1205,8 @@ static int update_all_cdp_prep( rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin, rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt, rrd_value_t **last_seasonal_coef, rrd_value_t **seasonal_coef, - rrd_value_t *pdp_temp, unsigned long *rra_current, int *schedule_smooth) + rrd_value_t *pdp_temp, unsigned long *rra_current, + unsigned long *skip_update, int *schedule_smooth) { unsigned long rra_idx; /* index into the CDP scratch array */ @@ -1204,6 +1219,7 @@ static int update_all_cdp_prep( for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) { current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam); start_pdp_offset = rrd->rra_def[rra_idx].pdp_cnt - proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt; + skip_update[rra_idx] = 0; if (start_pdp_offset <= elapsed_pdp_st) { rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) / rrd->rra_def[rra_idx].pdp_cnt + 1; @@ -1216,11 +1232,8 @@ static int update_all_cdp_prep( * so that they will be correct for the next observed value; note that for * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL; * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */ - if (rra_step_cnt[rra_idx] > 2) { - /* skip update by resetting rra_step_cnt[rra_idx], note that this is not data - * source specific; this is due to the bulk update, not a DNAN value - * for the specific data source. */ - rra_step_cnt[rra_idx] = 0; + if (rra_step_cnt[rra_idx] > 1) { + skip_update[rra_idx] = 1; lookup_seasonal(rrd, rra_idx, rra_start, rrd_file, elapsed_pdp_st, last_seasonal_coef); lookup_seasonal(rrd, rra_idx, rra_start, rrd_file, @@ -1237,7 +1250,6 @@ static int update_all_cdp_prep( } *rra_current = rrd_tell(rrd_file); } - /* if cf is DEVSEASONAL or SEASONAL */ if (rrd_test_error()) return -1; @@ -1644,15 +1656,12 @@ static int write_to_rras( unsigned long rra_begin, unsigned long *rra_current, time_t current_time, + unsigned long *skip_update, info_t **pcdp_summary) { unsigned long rra_idx; unsigned long rra_start; unsigned long rra_pos_tmp; /* temporary byte pointer. */ - /* number of PDP steps since the last update that - * are assigned to the first CDP to be generated - * since the last update. */ - unsigned short scratch_idx; time_t rra_time = 0; /* time of update for a RRA */ /* Ready to write to disk */ @@ -1667,7 +1676,7 @@ static int write_to_rras( rrd->rra_ptr[rra_idx].cur_row++; if (rrd->rra_ptr[rra_idx].cur_row >= rrd->rra_def[rra_idx].row_cnt) rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */ - /* positition on the first row */ + /* position on the first row */ rra_pos_tmp = rra_start + (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) * sizeof(rrd_value_t); @@ -1681,22 +1690,20 @@ static int write_to_rras( #ifdef DEBUG fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos); #endif - scratch_idx = CDP_primary_val; - if (*pcdp_summary != NULL) { - rra_time = (current_time - current_time - % (rrd->rra_def[rra_idx].pdp_cnt * - rrd->stat_head->pdp_step)) - - ((rra_step_cnt[rra_idx] - 1) * rrd->rra_def[rra_idx].pdp_cnt * - rrd->stat_head->pdp_step); + if (!skip_update[rra_idx]) { + if (*pcdp_summary != NULL) { + rra_time = (current_time - current_time + % (rrd->rra_def[rra_idx].pdp_cnt * + rrd->stat_head->pdp_step)) + - ((rra_step_cnt[rra_idx] - 1) * rrd->rra_def[rra_idx].pdp_cnt * + rrd->stat_head->pdp_step); + } + if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, CDP_primary_val, + pcdp_summary, rra_time) == -1) + return -1; } - if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, scratch_idx, - pcdp_summary, &rra_time) == -1) - return -1; - if (rrd_test_error()) - return -1; /* write other rows of the bulk update, if any */ - scratch_idx = CDP_secondary_val; for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) { if (++rrd->rra_ptr[rra_idx].cur_row == rrd->rra_def[rra_idx].row_cnt) { #ifdef DEBUG @@ -1717,21 +1724,20 @@ static int write_to_rras( #endif *rra_current = rra_start; } - if (*pcdp_summary != NULL) { - rra_time = (current_time - current_time - % (rrd->rra_def[rra_idx].pdp_cnt * - rrd->stat_head->pdp_step)) - - - ((rra_step_cnt[rra_idx] - 2) * rrd->rra_def[rra_idx].pdp_cnt * - rrd->stat_head->pdp_step); + if (!skip_update[rra_idx]) { + if (*pcdp_summary != NULL) { + rra_time = (current_time - current_time + % (rrd->rra_def[rra_idx].pdp_cnt * + rrd->stat_head->pdp_step)) + - + ((rra_step_cnt[rra_idx] - 2) * rrd->rra_def[rra_idx].pdp_cnt * + rrd->stat_head->pdp_step); + } + if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, + CDP_secondary_val, pcdp_summary, rra_time) == -1) + return -1; } - if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, - scratch_idx, pcdp_summary, &rra_time) == -1) - return -1; } - - if (rrd_test_error()) - return -1; } rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt * sizeof(rrd_value_t); @@ -1752,7 +1758,7 @@ static int write_RRA_row( unsigned long *rra_current, unsigned short CDP_scratch_idx, info_t **pcdp_summary, - time_t *rra_time) + time_t rra_time) { unsigned long ds_idx, cdp_idx; infoval iv; @@ -1769,7 +1775,7 @@ static int write_RRA_row( iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val; /* append info to the return hash */ *pcdp_summary = info_push(*pcdp_summary, - sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", *rra_time, + sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", rra_time, rrd->rra_def[rra_idx].cf_nam, rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam), RD_I_VAL, iv);