+ /* initialize tmplt redirector */
+ /* default config example (assume DS 1 is a CDEF DS)
+ tmpl_idx[0] -> 0; (time)
+ tmpl_idx[1] -> 1; (DS 0)
+ tmpl_idx[2] -> 3; (DS 2)
+ tmpl_idx[3] -> 4; (DS 3) */
+ (*tmpl_idx)[0] = 0; /* time */
+ for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
+ if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
+ (*tmpl_idx)[ii++] = i;
+ }
+ *tmpl_cnt = ii;
+
+ if (tmplt != NULL) {
+ if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
+ goto err_free_rra_step_cnt;
+ }
+ }
+
+ if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
+ * rrd->stat_head->ds_cnt)) == NULL) {
+ rrd_set_error("allocating pdp_new.");
+ goto err_free_rra_step_cnt;
+ }
+
+ return 0;
+
+ err_free_rra_step_cnt:
+ free(*rra_step_cnt);
+ err_free_tmpl_idx:
+ free(*tmpl_idx);
+ err_free_skip_update:
+ free(*skip_update);
+ err_free_pdp_temp:
+ free(*pdp_temp);
+ err_free_updvals:
+ free(*updvals);
+ return -1;
+}
+
+/*
+ * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
+ *
+ * Returns 0 on success.
+ */
+static int parse_template(
+ rrd_t *rrd,
+ const char *tmplt,
+ unsigned long *tmpl_cnt,
+ long *tmpl_idx)
+{
+ char *dsname, *tmplt_copy;
+ unsigned int tmpl_len, i;
+ int ret = 0;
+
+ *tmpl_cnt = 1; /* the first entry is the time */
+
+ /* we should work on a writeable copy here */
+ if ((tmplt_copy = strdup(tmplt)) == NULL) {
+ rrd_set_error("error copying tmplt '%s'", tmplt);
+ ret = -1;
+ goto out;
+ }
+
+ dsname = tmplt_copy;
+ tmpl_len = strlen(tmplt_copy);
+ for (i = 0; i <= tmpl_len; i++) {
+ if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
+ tmplt_copy[i] = '\0';
+ if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
+ rrd_set_error("tmplt contains more DS definitions than RRD");
+ ret = -1;
+ goto out_free_tmpl_copy;
+ }
+ if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
+ rrd_set_error("unknown DS name '%s'", dsname);
+ ret = -1;
+ goto out_free_tmpl_copy;
+ }
+ /* go to the next entry on the tmplt_copy */
+ if (i < tmpl_len)
+ dsname = &tmplt_copy[i + 1];
+ }
+ }
+ out_free_tmpl_copy:
+ free(tmplt_copy);
+ out:
+ return ret;
+}
+
+/*
+ * Parse an update string, updates the primary data points (PDPs)
+ * and consolidated data points (CDPs), and writes changes to the RRAs.
+ *
+ * Returns 0 on success, -1 on error.
+ */
+static int process_arg(
+ char *step_start,
+ rrd_t *rrd,
+ rrd_file_t *rrd_file,
+ unsigned long rra_begin,
+ unsigned long *rra_current,
+ time_t *current_time,
+ unsigned long *current_time_usec,
+ rrd_value_t *pdp_temp,
+ rrd_value_t *pdp_new,
+ unsigned long *rra_step_cnt,
+ char **updvals,
+ long *tmpl_idx,
+ unsigned long tmpl_cnt,
+ info_t **pcdp_summary,
+ int version,
+ unsigned long *skip_update,
+ int *schedule_smooth)
+{
+ rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
+
+ /* a vector of future Holt-Winters seasonal coefs */
+ unsigned long elapsed_pdp_st;
+
+ double interval, pre_int, post_int; /* interval between this and
+ * the last run */
+ unsigned long proc_pdp_cnt;
+ unsigned long rra_start;
+
+ if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
+ current_time, current_time_usec, version) == -1) {
+ return -1;
+ }
+ /* seek to the beginning of the rra's */
+ if (*rra_current != rra_begin) {
+#ifndef HAVE_MMAP
+ if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
+ rrd_set_error("seek error in rrd");
+ return -1;
+ }
+#endif
+ *rra_current = rra_begin;
+ }
+ rra_start = rra_begin;
+
+ interval = (double) (*current_time - rrd->live_head->last_up)
+ + (double) ((long) *current_time_usec -
+ (long) rrd->live_head->last_up_usec) / 1e6f;
+
+ /* process the data sources and update the pdp_prep
+ * area accordingly */
+ if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
+ return -1;
+ }
+
+ elapsed_pdp_st = calculate_elapsed_steps(rrd,
+ *current_time,
+ *current_time_usec, interval,
+ &pre_int, &post_int,
+ &proc_pdp_cnt);
+
+ /* has a pdp_st moment occurred since the last run ? */
+ if (elapsed_pdp_st == 0) {
+ /* no we have not passed a pdp_st moment. therefore update is simple */
+ simple_update(rrd, interval, pdp_new);
+ } else {
+ /* an pdp_st has occurred. */
+ if (process_all_pdp_st(rrd, interval,
+ pre_int, post_int,
+ elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
+ return -1;
+ }
+ if (update_all_cdp_prep(rrd, rra_step_cnt,
+ rra_begin, rrd_file,
+ elapsed_pdp_st,
+ proc_pdp_cnt,
+ &last_seasonal_coef,
+ &seasonal_coef,
+ pdp_temp, rra_current,
+ skip_update, schedule_smooth) == -1) {
+ goto err_free_coefficients;
+ }
+ if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
+ elapsed_pdp_st, pdp_temp,
+ &seasonal_coef) == -1) {
+ goto err_free_coefficients;
+ }
+ if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
+ rra_current, *current_time, skip_update,
+ pcdp_summary) == -1) {
+ goto err_free_coefficients;
+ }
+ } /* endif a pdp_st has occurred */
+ rrd->live_head->last_up = *current_time;
+ rrd->live_head->last_up_usec = *current_time_usec;
+
+ free(seasonal_coef);
+ free(last_seasonal_coef);
+ return 0;
+
+ err_free_coefficients:
+ free(seasonal_coef);
+ free(last_seasonal_coef);
+ return -1;
+}
+
+/*
+ * Parse a DS string (time + colon-separated values), storing the
+ * results in current_time, current_time_usec, and updvals.
+ *
+ * Returns 0 on success, -1 on error.
+ */
+static int parse_ds(
+ rrd_t *rrd,
+ char **updvals,
+ long *tmpl_idx,
+ char *input,
+ unsigned long tmpl_cnt,
+ time_t *current_time,
+ unsigned long *current_time_usec,
+ int version)
+{
+ char *p;
+ unsigned long i;
+ char timesyntax;
+
+ updvals[0] = input;
+ /* initialize all ds input to unknown except the first one
+ which has always got to be set */
+ for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
+ updvals[i] = "U";
+
+ /* separate all ds elements; first must be examined separately
+ due to alternate time syntax */
+ if ((p = strchr(input, '@')) != NULL) {
+ timesyntax = '@';
+ } else if ((p = strchr(input, ':')) != NULL) {
+ timesyntax = ':';
+ } else {
+ rrd_set_error("expected timestamp not found in data source from %s",
+ input);
+ return -1;
+ }
+ *p = '\0';
+ i = 1;
+ updvals[tmpl_idx[i++]] = p + 1;
+ while (*(++p)) {
+ if (*p == ':') {
+ *p = '\0';
+ if (i < tmpl_cnt) {
+ updvals[tmpl_idx[i++]] = p + 1;
+ }
+ }
+ }
+
+ if (i != tmpl_cnt) {
+ rrd_set_error("expected %lu data source readings (got %lu) from %s",
+ tmpl_cnt - 1, i, input);
+ return -1;
+ }
+
+ if (get_time_from_reading(rrd, timesyntax, updvals,
+ current_time, current_time_usec,
+ version) == -1) {
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ * Parse the time in a DS string, store it in current_time and
+ * current_time_usec and verify that it's later than the last
+ * update for this DS.
+ *
+ * Returns 0 on success, -1 on error.
+ */
+static int get_time_from_reading(
+ rrd_t *rrd,
+ char timesyntax,
+ char **updvals,
+ time_t *current_time,
+ unsigned long *current_time_usec,
+ int version)
+{
+ double tmp;
+ char *parsetime_error = NULL;
+ char *old_locale;
+ struct rrd_time_value ds_tv;
+ struct timeval tmp_time; /* used for time conversion */
+
+ /* get the time from the reading ... handle N */
+ if (timesyntax == '@') { /* at-style */
+ if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
+ rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
+ return -1;
+ }
+ if (ds_tv.type == RELATIVE_TO_END_TIME ||
+ ds_tv.type == RELATIVE_TO_START_TIME) {
+ rrd_set_error("specifying time relative to the 'start' "
+ "or 'end' makes no sense here: %s", updvals[0]);
+ return -1;
+ }
+ *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
+ *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
+ } else if (strcmp(updvals[0], "N") == 0) {
+ gettimeofday(&tmp_time, 0);
+ normalize_time(&tmp_time);
+ *current_time = tmp_time.tv_sec;
+ *current_time_usec = tmp_time.tv_usec;
+ } else {
+ old_locale = setlocale(LC_NUMERIC, "C");
+ tmp = strtod(updvals[0], 0);
+ setlocale(LC_NUMERIC, old_locale);
+ *current_time = floor(tmp);
+ *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
+ }
+ /* dont do any correction for old version RRDs */
+ if (version < 3)
+ *current_time_usec = 0;
+
+ if (*current_time < rrd->live_head->last_up ||
+ (*current_time == rrd->live_head->last_up &&
+ (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
+ rrd_set_error("illegal attempt to update using time %ld when "
+ "last update time is %ld (minimum one second step)",
+ *current_time, rrd->live_head->last_up);
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ * Update pdp_new by interpreting the updvals according to the DS type
+ * (COUNTER, GAUGE, etc.).
+ *
+ * Returns 0 on success, -1 on error.
+ */
+static int update_pdp_prep(
+ rrd_t *rrd,
+ char **updvals,
+ rrd_value_t *pdp_new,
+ double interval)
+{
+ unsigned long ds_idx;
+ int ii;
+ char *endptr; /* used in the conversion */
+ double rate;
+ char *old_locale;
+ enum dst_en dst_idx;
+
+ for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
+ dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
+
+ /* make sure we do not build diffs with old last_ds values */
+ if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
+ strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
+ rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
+ }
+
+ /* NOTE: DST_CDEF should never enter this if block, because
+ * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
+ * accidently specified a value for the DST_CDEF. To handle this case,
+ * an extra check is required. */
+
+ if ((updvals[ds_idx + 1][0] != 'U') &&
+ (dst_idx != DST_CDEF) &&
+ rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
+ rate = DNAN;
+
+ /* pdp_new contains rate * time ... eg the bytes transferred during
+ * the interval. Doing it this way saves a lot of math operations
+ */
+ switch (dst_idx) {
+ case DST_COUNTER:
+ case DST_DERIVE:
+ for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
+ if ((updvals[ds_idx + 1][ii] < '0'
+ || updvals[ds_idx + 1][ii] > '9')
+ && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
+ rrd_set_error("not a simple integer: '%s'",
+ updvals[ds_idx + 1]);
+ return -1;