X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrd_update.c;h=78f9a572e02e3bd70bedcdb6022899a61195fcc1;hb=1edac007f72e1a520764b767619ef075ccd79dbc;hp=c1849f398d5f578be93d4cc327e902eadfeac026;hpb=2a6a270edfda89b04722b42b57992907f871c671;p=rrdtool.git diff --git a/src/rrd_update.c b/src/rrd_update.c index c1849f3..78f9a57 100644 --- a/src/rrd_update.c +++ b/src/rrd_update.c @@ -7,11 +7,6 @@ *****************************************************************************/ #include "rrd_tool.h" -#include -#include -#ifdef HAVE_MMAP -# include -#endif #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) #include @@ -61,10 +56,10 @@ static int gettimeofday( #endif /* - * normilize time as returned by gettimeofday. usec part must + * normalize time as returned by gettimeofday. usec part must * be always >= 0 */ -static void normalize_time( +static inline void normalize_time( struct timeval *t) { if (t->tv_usec < 0) { @@ -73,34 +68,51 @@ static void normalize_time( } } -/* Local prototypes */ -int LockRRD( - int in_file); - -#ifdef HAVE_MMAP -info_t *write_RRA_row( +static inline info_t *write_RRA_row( + rrd_file_t *rrd_file, rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current, unsigned short CDP_scratch_idx, -#ifndef DEBUG - int UNUSED(in_file), -#else - int in_file, -#endif info_t *pcdp_summary, - time_t *rra_time, - void *rrd_mmaped_file); -#else -info_t *write_RRA_row( - rrd_t *rrd, - unsigned long rra_idx, - unsigned long *rra_current, - unsigned short CDP_scratch_idx, - int in_file, - info_t *pcdp_summary, - time_t *rra_time); + time_t *rra_time) +{ + unsigned long ds_idx, cdp_idx; + infoval iv; + + for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) { + /* compute the cdp index */ + cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx; +#ifdef DEBUG + fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n", + rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val, + rrd_file->pos, rrd->rra_def[rra_idx].cf_nam); #endif + if (pcdp_summary != NULL) { + iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val; + /* append info to the return hash */ + pcdp_summary = info_push(pcdp_summary, + sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", + *rra_time, + rrd->rra_def[rra_idx]. + cf_nam, + rrd->rra_def[rra_idx]. + pdp_cnt, + rrd->ds_def[ds_idx]. + ds_nam), RD_I_VAL, iv); + } + if (rrd_write + (rrd_file, + &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val), + sizeof(rrd_value_t)) != sizeof(rrd_value_t)) { + rrd_set_error("writing rrd: %s", rrd_strerror(errno)); + return 0; + } + *rra_current += sizeof(rrd_value_t); + } + return (pcdp_summary); +} + int rrd_update_r( const char *filename, const char *tmplt, @@ -259,7 +271,7 @@ int _rrd_update( rrd_value_t *pdp_new; /* prepare the incoming data * to be added the the * existing entry */ - rrd_value_t *pdp_temp; /* prepare the pdp values + rrd_value_t *pdp_temp; /* prepare the pdp values * to be added the the * cdp values */ @@ -306,13 +318,15 @@ int _rrd_update( /* need at least 1 arguments: data. */ if (argc < 1) { rrd_set_error("Not enough arguments"); - return -1; + goto err_out; } rrd_file = rrd_open(filename, &rrd, RRD_READWRITE); if (rrd_file == NULL) { - return -1; + goto err_free; } + /* We are now at the beginning of the rra's */ + rra_current = rra_start = rra_begin = rrd_file->header_len; /* initialize time */ version = atoi(rrd.stat_head->version); @@ -325,61 +339,30 @@ int _rrd_update( current_time_usec = 0; } - rra_current = rra_start = rra_begin = rrd_file->header_len; - /* This is defined in the ANSI C standard, section 7.9.5.3: - - When a file is opened with udpate mode ('+' as the second - or third character in the ... list of mode argument - variables), both input and output may be performed on the - associated stream. However, ... input may not be directly - followed by output without an intervening call to a file - positioning function, unless the input operation encounters - end-of-file. */ -#if 0 //def HAVE_MMAP - rrd_filesize = rrd_file->file_size; - fseek(rrd_file->fd, 0, SEEK_END); - rrd_filesize = ftell(rrd_file->fd); - fseek(rrd_file->fd, rra_current, SEEK_SET); -#else -// fseek(rrd_file->fd, 0, SEEK_CUR); -#endif - - /* get exclusive lock to whole file. * lock gets removed when we close the file. */ if (LockRRD(rrd_file->fd) != 0) { rrd_set_error("could not lock RRD"); - rrd_free(&rrd); - close(rrd_file->fd); - return (-1); + goto err_close; } if ((updvals = malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) { rrd_set_error("allocating updvals pointer array"); - rrd_free(&rrd); - close(rrd_file->fd); - return (-1); + goto err_close; } if ((pdp_temp = malloc(sizeof(rrd_value_t) * rrd.stat_head->ds_cnt)) == NULL) { rrd_set_error("allocating pdp_temp ..."); - free(updvals); - rrd_free(&rrd); - close(rrd_file->fd); - return (-1); + goto err_free_updvals; } if ((tmpl_idx = malloc(sizeof(unsigned long) * (rrd.stat_head->ds_cnt + 1))) == NULL) { rrd_set_error("allocating tmpl_idx ..."); - free(pdp_temp); - free(updvals); - rrd_free(&rrd); - close(rrd_file->fd); - return (-1); + goto err_free_pdp_temp; } /* initialize tmplt redirector */ /* default config example (assume DS 1 is a CDEF DS) @@ -409,22 +392,11 @@ int _rrd_update( if (tmpl_cnt > rrd.stat_head->ds_cnt) { rrd_set_error ("tmplt contains more DS definitions than RRD"); - free(updvals); - free(pdp_temp); - free(tmpl_idx); - rrd_free(&rrd); - close(rrd_file->fd); - return (-1); + goto err_free_tmpl_idx; } if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) { rrd_set_error("unknown DS name '%s'", dsname); - free(updvals); - free(pdp_temp); - free(tmplt_copy); - free(tmpl_idx); - rrd_free(&rrd); - close(rrd_file->fd); - return (-1); + goto err_free_tmpl_idx; } else { /* the first element is always the time */ tmpl_idx[tmpl_cnt - 1]++; @@ -443,33 +415,8 @@ int _rrd_update( if ((pdp_new = malloc(sizeof(rrd_value_t) * rrd.stat_head->ds_cnt)) == NULL) { rrd_set_error("allocating pdp_new ..."); - free(updvals); - free(pdp_temp); - free(tmpl_idx); - rrd_free(&rrd); - close(rrd_file->fd); - return (-1); - } -#if 0 //def HAVE_MMAP - rrd_mmaped_file = mmap(0, - rrd_file->file_len, - PROT_READ | PROT_WRITE, - MAP_SHARED, fileno(in_file), 0); - if (rrd_mmaped_file == MAP_FAILED) { - rrd_set_error("error mmapping file %s", filename); - free(updvals); - free(pdp_temp); - free(tmpl_idx); - rrd_free(&rrd); - close(rrd_file->fd); - return (-1); + goto err_free_tmpl_idx; } -#ifdef USE_MADVISE - /* when we use mmaping we tell the kernel the mmap equivalent - of POSIX_FADV_RANDOM */ - madvise(rrd_mmaped_file, rrd_filesize, POSIX_MADV_RANDOM); -#endif -#endif /* loop through the arguments. */ for (arg_i = 0; arg_i < argc; arg_i++) { char *stepper = strdup(argv[arg_i]); @@ -482,20 +429,11 @@ int _rrd_update( if (stepper == NULL) { rrd_set_error("failed duplication argv entry"); free(step_start); - free(updvals); - free(pdp_temp); - free(tmpl_idx); - rrd_free(&rrd); -#ifdef HAVE_MMAP - rrd_close(rrd_file); -#endif - close(rrd_file->fd); - return (-1); + goto err_free_pdp_new; } /* initialize all ds input to unknown except the first one which has always got to be set */ - for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++) - updvals[ii] = "U"; + memset(updvals + 1, 'U', rrd.stat_head->ds_cnt); updvals[0] = stepper; /* separate all ds elements; first must be examined separately due to alternate time syntax */ @@ -551,6 +489,7 @@ int _rrd_update( } current_time = mktime(&ds_tv.tm) + ds_tv.offset; + current_time_usec = 0; /* FIXME: how to handle usecs here ? */ } else if (strcmp(updvals[0], "N") == 0) { @@ -581,7 +520,6 @@ int _rrd_update( break; } - /* seek to the beginning of the rra's */ if (rra_current != rra_begin) { #ifndef HAVE_MMAP @@ -647,7 +585,7 @@ int _rrd_update( /* NOTE: DST_CDEF should never enter this if block, because * updvals[i+1][0] is initialized to 'U'; unless the caller - * accidently specified a value for the DST_CDEF. To handle + * accidently specified a value for the DST_CDEF. To handle * this case, an extra check is required. */ if ((updvals[i + 1][0] != 'U') && @@ -659,8 +597,6 @@ int _rrd_update( /* pdp_new contains rate * time ... eg the bytes * transferred during the interval. Doing it this way saves * a lot of math operations */ - - switch (dst_idx) { case DST_COUNTER: case DST_DERIVE: @@ -702,7 +638,7 @@ int _rrd_update( errno = 0; pdp_new[i] = strtod(updvals[i + 1], &endptr); if (errno > 0) { - rrd_set_error("converting '%s' to float: %s", + rrd_set_error("converting '%s' to float: %s", updvals[i + 1], rrd_strerror(errno)); break; }; @@ -718,7 +654,7 @@ int _rrd_update( errno = 0; pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval; if (errno > 0) { - rrd_set_error("converting '%s' to float: %s", + rrd_set_error("converting '%s' to float: %s", updvals[i + 1], rrd_strerror(errno)); break; }; @@ -804,21 +740,21 @@ int _rrd_update( } else { /* an pdp_st has occurred. */ - /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which - * occurred up to the last run. - pdp_new[] contains rate*seconds from the latest run. - pdp_temp[] will contain the rate for cdp */ + /* in pdp_prep[].scratch[PDP_val].u_val we have collected + rate*seconds which occurred up to the last run. + pdp_new[] contains rate*seconds from the latest run. + pdp_temp[] will contain the rate for cdp */ for (i = 0; i < rrd.stat_head->ds_cnt; i++) { /* update pdp_prep to the current pdp_st. */ double pre_unknown = 0.0; - if (isnan(pdp_new[i])) + if (isnan(pdp_new[i])) { /* a final bit of unkonwn to be added bevore calculation - * we use a tempaorary variable for this so that we - * don't have to turn integer lines before using the value */ + we use a temporary variable for this so that we + don't have to turn integer lines before using the value */ pre_unknown = pre_int; - else { + } else { if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) { rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i] / interval * pre_int; @@ -831,9 +767,9 @@ int _rrd_update( /* if too much of the pdp_prep is unknown we dump it */ if ( - /* removed because this does not agree with the definition - a heart beat can be unknown */ - /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt + /* removed because this does not agree with the + definition that a heartbeat can be unknown */ + /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */ /* if the interval is larger thatn mrhb we get NAN */ (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || @@ -931,16 +867,17 @@ int _rrd_update( } if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) { - /* If this is a bulk update, we need to skip ahead in the seasonal - * arrays so that they will be correct for the next observed value; - * note that for the bulk update itself, no update will occur to - * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will - * be set to DNAN. */ + /* If this is a bulk update, we need to skip ahead in + the seasonal arrays so that they will be correct for + the next observed value; + note that for the bulk update itself, no update will + occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT + and DEVPREDICT will be set to DNAN. */ if (rra_step_cnt[i] > 2) { /* skip update by resetting rra_step_cnt[i], - * note that this is not data source specific; this is due - * to the bulk update, not a DNAN value for the specific data - * source. */ + note that this is not data source specific; this is + due to the bulk update, not a DNAN value for the + specific data source. */ rra_step_cnt[i] = 0; lookup_seasonal(&rrd, i, rra_start, rrd_file, elapsed_pdp_st, &last_seasonal_coef); @@ -1358,12 +1295,10 @@ int _rrd_update( (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) * sizeof(rrd_value_t); if (rra_pos_tmp != rra_current) { -#ifndef HAVE_MMAP if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) { rrd_set_error("seek error in rrd"); break; } -#endif rra_current = rra_pos_tmp; } #ifdef DEBUG @@ -1379,16 +1314,9 @@ int _rrd_update( 1) * rrd.rra_def[i].pdp_cnt * rrd.stat_head->pdp_step); } -#ifdef HAVE_MMAP pcdp_summary = - write_RRA_row(&rrd, i, &rra_current, scratch_idx, - rrd_file->fd, pcdp_summary, &rra_time, - rrd_file->file_start); -#else - pcdp_summary = - write_RRA_row(&rrd, i, &rra_current, scratch_idx, - rrd_file->fd, pcdp_summary, &rra_time); -#endif + write_RRA_row(rrd_file, &rrd, i, &rra_current, + scratch_idx, pcdp_summary, &rra_time); if (rrd_test_error()) break; @@ -1423,16 +1351,9 @@ int _rrd_update( 2) * rrd.rra_def[i].pdp_cnt * rrd.stat_head->pdp_step); } -#ifdef HAVE_MMAP - pcdp_summary = - write_RRA_row(&rrd, i, &rra_current, scratch_idx, - rrd_file->fd, pcdp_summary, &rra_time, - rrd_file->file_start); -#else pcdp_summary = - write_RRA_row(&rrd, i, &rra_current, scratch_idx, - rrd_file->fd, pcdp_summary, &rra_time); -#endif + write_RRA_row(rrd_file, &rrd, i, &rra_current, + scratch_idx, pcdp_summary, &rra_time); } if (rrd_test_error()) @@ -1459,21 +1380,13 @@ int _rrd_update( free(rra_step_cnt); rpnstack_free(&rpnstack); -#ifdef HAVE_MMAP - if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) { - rrd_set_error("error writing(unmapping) file: %s", filename); - } +#if 0 + //rrd_flush(rrd_file); //XXX: really needed? #endif /* if we got here and if there is an error and if the file has not been * written to, then close things up and return. */ if (rrd_test_error()) { - free(updvals); - free(tmpl_idx); - rrd_free(&rrd); - free(pdp_temp); - free(pdp_new); - close(rrd_file->fd); - return (-1); + goto err_free_pdp_new; } /* aargh ... that was tough ... so many loops ... anyway, its done. @@ -1484,38 +1397,22 @@ int _rrd_update( + sizeof(rra_def_t) * rrd.stat_head->rra_cnt), SEEK_SET) != 0) { rrd_set_error("seek rrd for live header writeback"); - free(updvals); - free(tmpl_idx); - rrd_free(&rrd); - free(pdp_temp); - free(pdp_new); - close(rrd_file->fd); - return (-1); + goto err_free_pdp_new; } - + /* for mmap, we did already write to the underlying mapping, so we do + not need to write again. */ +#ifndef HAVE_MMAP if (version >= 3) { if (rrd_write(rrd_file, rrd.live_head, sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) { rrd_set_error("rrd_write live_head to rrd"); - free(updvals); - rrd_free(&rrd); - free(tmpl_idx); - free(pdp_temp); - free(pdp_new); - close(rrd_file->fd); - return (-1); + goto err_free_pdp_new; } } else { if (rrd_write(rrd_file, &rrd.live_head->last_up, sizeof(time_t) * 1) != sizeof(time_t) * 1) { rrd_set_error("rrd_write live_head to rrd"); - free(updvals); - rrd_free(&rrd); - free(tmpl_idx); - free(pdp_temp); - free(pdp_new); - close(rrd_file->fd); - return (-1); + goto err_free_pdp_new; } } @@ -1524,13 +1421,7 @@ int _rrd_update( sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt) != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) { rrd_set_error("rrd_write pdp_prep to rrd"); - free(updvals); - rrd_free(&rrd); - free(tmpl_idx); - free(pdp_temp); - free(pdp_new); - close(rrd_file->fd); - return (-1); + goto err_free_pdp_new; } if (rrd_write(rrd_file, rrd.cdp_prep, @@ -1540,27 +1431,16 @@ int _rrd_update( rrd.stat_head->ds_cnt)) { rrd_set_error("rrd_write cdp_prep to rrd"); - free(updvals); - free(tmpl_idx); - rrd_free(&rrd); - free(pdp_temp); - free(pdp_new); - close(rrd_file->fd); - return (-1); + goto err_free_pdp_new; } if (rrd_write(rrd_file, rrd.rra_ptr, sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt) != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) { rrd_set_error("rrd_write rra_ptr to rrd"); - free(updvals); - free(tmpl_idx); - rrd_free(&rrd); - free(pdp_temp); - free(pdp_new); - close(rrd_file->fd); - return (-1); + goto err_free_pdp_new; } +#endif #ifdef HAVE_POSIX_FADVISExxx /* with update we have write ops, so they will probably not be done by now, this means @@ -1571,11 +1451,10 @@ int _rrd_update( if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) { rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename, rrd_strerror(errno)); - close(rrd_file->fd); - return (-1); + goto err_free_pdp_new; } #endif - /*XXX: ? */ rrd_flush(rrd_file); + /* rrd_flush(rrd_file); */ /* calling the smoothing code here guarantees at most * one smoothing operation per rrd_update call. Unfortunately, @@ -1583,8 +1462,6 @@ int _rrd_update( * for smoothing to occur off-schedule. This really isn't * critical except during the burning cycles. */ if (schedule_smooth) { -// in_file = fopen(filename,"rb+"); - rra_start = rra_begin; for (i = 0; i < rrd.stat_head->rra_cnt; ++i) { @@ -1606,30 +1483,34 @@ int _rrd_update( posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) { rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename, rrd_strerror(errno)); - close(rrd_file->fd); - return (-1); + goto err_free_pdp_new; } #endif - close(rrd_file->fd); - } - - /* OK now close the files and free the memory */ - if (close(rrd_file->fd) != 0) { - rrd_set_error("closing rrd"); - free(updvals); - free(tmpl_idx); - rrd_free(&rrd); - free(pdp_temp); - free(pdp_new); - return (-1); } rrd_free(&rrd); - free(updvals); - free(tmpl_idx); + rrd_close(rrd_file); + free(pdp_new); + free(tmpl_idx); free(pdp_temp); + free(updvals); return (0); + + err_free_pdp_new: + free(pdp_new); + err_free_tmpl_idx: + free(tmpl_idx); + err_free_pdp_temp: + free(pdp_temp); + err_free_updvals: + free(updvals); + err_close: + rrd_close(rrd_file); + err_free: + rrd_free(&rrd); + err_out: + return (-1); } /* @@ -1666,89 +1547,3 @@ int LockRRD( return (rcstat); } - - -#ifdef HAVE_MMAP -info_t - - - - - - - - - *write_RRA_row( - rrd_t *rrd, - unsigned long rra_idx, - unsigned long *rra_current, - unsigned short CDP_scratch_idx, -#ifndef DEBUG - int UNUSED(in_file), -#else - int in_file, -#endif - info_t *pcdp_summary, - time_t *rra_time, - void *rrd_mmaped_file) -#else -info_t - - - - - - - - - *write_RRA_row( - rrd_t *rrd, - unsigned long rra_idx, - unsigned long *rra_current, - unsigned short CDP_scratch_idx, - int in_file, - info_t *pcdp_summary, - time_t *rra_time) -#endif -{ - unsigned long ds_idx, cdp_idx; - infoval iv; - - for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) { - /* compute the cdp index */ - cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx; -#ifdef DEBUG - fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n", - rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val, - rrd_file->pos, rrd->rra_def[rra_idx].cf_nam); -#endif - if (pcdp_summary != NULL) { - iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val; - /* append info to the return hash */ - pcdp_summary = info_push(pcdp_summary, - sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", - *rra_time, - rrd->rra_def[rra_idx]. - cf_nam, - rrd->rra_def[rra_idx]. - pdp_cnt, - rrd->ds_def[ds_idx]. - ds_nam), RD_I_VAL, iv); - } -#ifdef HAVE_MMAP - memcpy((char *) rrd_mmaped_file + *rra_current, - &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val), - sizeof(rrd_value_t)); -#else - if (rrd_write - (rrd_file, - &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val), - sizeof(rrd_value_t) * 1) != sizeof(rrd_value_t) * 1) { - rrd_set_error("writing rrd"); - return 0; - } -#endif - *rra_current += sizeof(rrd_value_t); - } - return (pcdp_summary); -}