1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.13 2003/11/11 19:38:03 oetiker
9 * rrd files should NOT change size ever ... bulk update code wa buggy.
10 * -- David M. Grimes <dgrimes@navisite.com>
12 * Revision 1.12 2003/09/04 13:16:12 oetiker
13 * should not assigne but compare ... grrrrr
15 * Revision 1.11 2003/09/02 21:58:35 oetiker
16 * be pickier about what we accept in rrd_update. Complain if things do not work out
18 * Revision 1.10 2003/04/29 19:14:12 jake
19 * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
20 * Also revert accidental addition of -I to aclocal MakeMakefile.
22 * Revision 1.9 2003/04/25 18:35:08 jake
23 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
25 * Revision 1.8 2003/03/31 21:22:12 oetiker
26 * enables RRDtool updates with microsecond or in case of windows millisecond
27 * precision. This is needed to reduce time measurement error when archive step
28 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
30 * Revision 1.7 2003/02/13 07:05:27 oetiker
31 * Find attached the patch I promised to send to you. Please note that there
32 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
33 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
34 * library is identical to librrd, but it contains support code for per-thread
35 * global variables currently used for error information only. This is similar
36 * to how errno per-thread variables are implemented. librrd_th must be linked
37 * alongside of libpthred
39 * There is also a new file "THREADS", holding some documentation.
41 * -- Peter Stamfest <peter@stamfest.at>
43 * Revision 1.6 2002/02/01 20:34:49 oetiker
44 * fixed version number and date/time
46 * Revision 1.5 2001/05/09 05:31:01 oetiker
47 * Bug fix: when update of multiple PDP/CDP RRAs coincided
48 * with interpolation of multiple PDPs an incorrect value was
49 * stored as the CDP. Especially evident for GAUGE data sources.
50 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
52 * Revision 1.4 2001/03/10 23:54:41 oetiker
53 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
54 * parser and calculator from rrd_graph and puts then in a new file,
55 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
56 * clean-up of aberrant behavior stuff, including a bug fix.
57 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
58 * -- Jake Brutlag <jakeb@corp.webtv.net>
60 * Revision 1.3 2001/03/04 13:01:55 oetiker
61 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
62 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
63 * This is backwards compatible! But new files using the Aberrant stuff are not readable
64 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
65 * -- Jake Brutlag <jakeb@corp.webtv.net>
67 * Revision 1.2 2001/03/04 11:14:25 oetiker
68 * added at-style-time@value:value syntax to rrd_update
69 * -- Dave Bodenstab <imdave@mcs.net>
71 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
74 *****************************************************************************/
77 #include <sys/types.h>
81 #include <sys/locking.h>
87 #include "rrd_rpncalc.h"
89 #include "rrd_is_thread_safe.h"
93 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
96 #include <sys/timeb.h>
99 time_t tv_sec; /* seconds */
100 long tv_usec; /* microseconds */
104 int tz_minuteswest; /* minutes W of Greenwich */
105 int tz_dsttime; /* type of dst correction */
108 static gettimeofday(struct timeval *t, struct __timezone *tz) {
110 struct timeb current_time;
112 _ftime(¤t_time);
114 t->tv_sec = current_time.time;
115 t->tv_usec = current_time.millitm * 1000;
120 * normilize time as returned by gettimeofday. usec part must
123 static void normalize_time(struct timeval *t)
127 t->tv_usec += 1000000L;
131 /* Local prototypes */
132 int LockRRD(FILE *rrd_file);
133 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
134 unsigned long *rra_current,
135 unsigned short CDP_scratch_idx, FILE *rrd_file,
136 info_t *pcdp_summary, time_t *rra_time);
137 int rrd_update_r(char *filename, char *template, int argc, char **argv);
138 int _rrd_update(char *filename, char *template, int argc, char **argv,
141 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
146 main(int argc, char **argv){
147 rrd_update(argc,argv);
148 if (rrd_test_error()) {
149 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
150 "Usage: rrdupdate filename\n"
151 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
152 "\t\t\ttime|N:value[:value...]\n\n"
153 "\t\t\tat-time@value[:value...]\n\n"
154 "\t\t\t[ time:value[:value...] ..]\n\n");
156 printf("ERROR: %s\n",rrd_get_error());
164 info_t *rrd_update_v(int argc, char **argv)
166 char *template = NULL;
167 info_t *result = NULL;
171 static struct option long_options[] =
173 {"template", required_argument, 0, 't'},
176 int option_index = 0;
178 opt = getopt_long(argc, argv, "t:",
179 long_options, &option_index);
190 rrd_set_error("unknown option '%s'",argv[optind-1]);
196 /* need at least 2 arguments: filename, data. */
197 if (argc-optind < 2) {
198 rrd_set_error("Not enough arguments");
202 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
203 rc.u_int = _rrd_update(argv[optind], template,
204 argc - optind - 1, argv + optind + 1, result);
205 result->value.u_int = rc.u_int;
211 rrd_update(int argc, char **argv)
213 char *template = NULL;
217 static struct option long_options[] =
219 {"template", required_argument, 0, 't'},
222 int option_index = 0;
224 opt = getopt_long(argc, argv, "t:",
225 long_options, &option_index);
236 rrd_set_error("unknown option '%s'",argv[optind-1]);
241 /* need at least 2 arguments: filename, data. */
242 if (argc-optind < 2) {
243 rrd_set_error("Not enough arguments");
248 rc = rrd_update_r(argv[optind], template,
249 argc - optind - 1, argv + optind + 1);
254 rrd_update_r(char *filename, char *template, int argc, char **argv)
256 return _rrd_update(filename, template, argc, argv, NULL);
260 _rrd_update(char *filename, char *template, int argc, char **argv,
261 info_t *pcdp_summary)
266 unsigned long i,ii,iii=1;
268 unsigned long rra_begin; /* byte pointer to the rra
269 * area in the rrd file. this
270 * pointer never changes value */
271 unsigned long rra_start; /* byte pointer to the rra
272 * area in the rrd file. this
273 * pointer changes as each rrd is
275 unsigned long rra_current; /* byte pointer to the current write
276 * spot in the rrd file. */
277 unsigned long rra_pos_tmp; /* temporary byte pointer. */
279 pre_int,post_int; /* interval between this and
281 unsigned long proc_pdp_st; /* which pdp_st was the last
283 unsigned long occu_pdp_st; /* when was the pdp_st
284 * before the last update
286 unsigned long proc_pdp_age; /* how old was the data in
287 * the pdp prep area when it
288 * was last updated */
289 unsigned long occu_pdp_age; /* how long ago was the last
291 rrd_value_t *pdp_new; /* prepare the incoming data
292 * to be added the the
294 rrd_value_t *pdp_temp; /* prepare the pdp values
295 * to be added the the
298 long *tmpl_idx; /* index representing the settings
299 transported by the template index */
300 unsigned long tmpl_cnt = 2; /* time and data */
305 time_t rra_time; /* time of update for a RRA */
306 unsigned long current_time_usec; /* microseconds part of current time */
307 struct timeval tmp_time; /* used for time conversion */
310 int schedule_smooth = 0;
311 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
312 /* a vector of future Holt-Winters seasonal coefs */
313 unsigned long elapsed_pdp_st;
314 /* number of elapsed PDP steps since last update */
315 unsigned long *rra_step_cnt = NULL;
316 /* number of rows to be updated in an RRA for a data
318 unsigned long start_pdp_offset;
319 /* number of PDP steps since the last update that
320 * are assigned to the first CDP to be generated
321 * since the last update. */
322 unsigned short scratch_idx;
323 /* index into the CDP scratch array */
324 enum cf_en current_cf;
325 /* numeric id of the current consolidation function */
326 rpnstack_t rpnstack; /* used for COMPUTE DS */
327 int version; /* rrd version */
328 char *endptr; /* used in the conversion */
330 rpnstack_init(&rpnstack);
332 /* need at least 1 arguments: data. */
334 rrd_set_error("Not enough arguments");
340 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
343 /* initialize time */
344 version = atoi(rrd.stat_head->version);
345 gettimeofday(&tmp_time, 0);
346 normalize_time(&tmp_time);
347 current_time = tmp_time.tv_sec;
349 current_time_usec = tmp_time.tv_usec;
352 current_time_usec = 0;
355 rra_current = rra_start = rra_begin = ftell(rrd_file);
356 /* This is defined in the ANSI C standard, section 7.9.5.3:
358 When a file is opened with udpate mode ('+' as the second
359 or third character in the ... list of mode argument
360 variables), both input and ouptut may be performed on the
361 associated stream. However, ... input may not be directly
362 followed by output without an intervening call to a file
363 positioning function, unless the input oepration encounters
365 fseek(rrd_file, 0, SEEK_CUR);
368 /* get exclusive lock to whole file.
369 * lock gets removed when we close the file.
371 if (LockRRD(rrd_file) != 0) {
372 rrd_set_error("could not lock RRD");
378 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
379 rrd_set_error("allocating updvals pointer array");
385 if ((pdp_temp = malloc(sizeof(rrd_value_t)
386 *rrd.stat_head->ds_cnt))==NULL){
387 rrd_set_error("allocating pdp_temp ...");
394 if ((tmpl_idx = malloc(sizeof(unsigned long)
395 *(rrd.stat_head->ds_cnt+1)))==NULL){
396 rrd_set_error("allocating tmpl_idx ...");
403 /* initialize template redirector */
404 /* default config example (assume DS 1 is a CDEF DS)
405 tmpl_idx[0] -> 0; (time)
406 tmpl_idx[1] -> 1; (DS 0)
407 tmpl_idx[2] -> 3; (DS 2)
408 tmpl_idx[3] -> 4; (DS 3) */
409 tmpl_idx[0] = 0; /* time */
410 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
412 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
419 unsigned int tmpl_len;
421 tmpl_cnt = 1; /* the first entry is the time */
422 tmpl_len = strlen(template);
423 for(i=0;i<=tmpl_len ;i++) {
424 if (template[i] == ':' || template[i] == '\0') {
426 if (tmpl_cnt>rrd.stat_head->ds_cnt){
427 rrd_set_error("Template contains more DS definitions than RRD");
428 free(updvals); free(pdp_temp);
429 free(tmpl_idx); rrd_free(&rrd);
430 fclose(rrd_file); return(-1);
432 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
433 rrd_set_error("unknown DS name '%s'",dsname);
434 free(updvals); free(pdp_temp);
435 free(tmpl_idx); rrd_free(&rrd);
436 fclose(rrd_file); return(-1);
438 /* the first element is always the time */
439 tmpl_idx[tmpl_cnt-1]++;
440 /* go to the next entry on the template */
441 dsname = &template[i+1];
442 /* fix the damage we did before */
451 if ((pdp_new = malloc(sizeof(rrd_value_t)
452 *rrd.stat_head->ds_cnt))==NULL){
453 rrd_set_error("allocating pdp_new ...");
462 /* loop through the arguments. */
463 for(arg_i=0; arg_i<argc;arg_i++) {
464 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
465 char *step_start = stepper;
467 char *parsetime_error = NULL;
468 enum {atstyle, normal} timesyntax;
469 struct time_value ds_tv;
470 if (stepper == NULL){
471 rrd_set_error("failed duplication argv entry");
479 /* initialize all ds input to unknown except the first one
480 which has always got to be set */
481 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
482 strcpy(stepper,argv[arg_i]);
484 /* separate all ds elements; first must be examined separately
485 due to alternate time syntax */
486 if ((p=strchr(stepper,'@'))!=NULL) {
487 timesyntax = atstyle;
490 } else if ((p=strchr(stepper,':'))!=NULL) {
495 rrd_set_error("expected timestamp not found in data source from %s:...",
501 updvals[tmpl_idx[ii]] = stepper;
503 if (*stepper == ':') {
507 updvals[tmpl_idx[ii]] = stepper+1;
513 if (ii != tmpl_cnt-1) {
514 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
515 tmpl_cnt-1, ii, argv[arg_i]);
520 /* get the time from the reading ... handle N */
521 if (timesyntax == atstyle) {
522 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
523 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
527 if (ds_tv.type == RELATIVE_TO_END_TIME ||
528 ds_tv.type == RELATIVE_TO_START_TIME) {
529 rrd_set_error("specifying time relative to the 'start' "
530 "or 'end' makes no sense here: %s",
536 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
537 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
539 } else if (strcmp(updvals[0],"N")==0){
540 gettimeofday(&tmp_time, 0);
541 normalize_time(&tmp_time);
542 current_time = tmp_time.tv_sec;
543 current_time_usec = tmp_time.tv_usec;
546 tmp = strtod(updvals[0], 0);
547 current_time = floor(tmp);
548 current_time_usec = (long)((tmp - current_time) * 1000000L);
550 /* dont do any correction for old version RRDs */
552 current_time_usec = 0;
554 if(current_time <= rrd.live_head->last_up){
555 rrd_set_error("illegal attempt to update using time %ld when "
556 "last update time is %ld (minimum one second step)",
557 current_time, rrd.live_head->last_up);
563 /* seek to the beginning of the rra's */
564 if (rra_current != rra_begin) {
565 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
566 rrd_set_error("seek error in rrd");
570 rra_current = rra_begin;
572 rra_start = rra_begin;
574 /* when was the current pdp started */
575 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
576 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
578 /* when did the last pdp_st occur */
579 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
580 occu_pdp_st = current_time - occu_pdp_age;
581 /* interval = current_time - rrd.live_head->last_up; */
582 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
584 if (occu_pdp_st > proc_pdp_st){
585 /* OK we passed the pdp_st moment*/
586 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
587 * occurred before the latest
589 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
590 post_int = occu_pdp_age; /* how much after it */
591 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
605 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
606 occu_pdp_age, occu_pdp_st,
607 interval, pre_int, post_int);
610 /* process the data sources and update the pdp_prep
611 * area accordingly */
612 for(i=0;i<rrd.stat_head->ds_cnt;i++){
614 dst_idx= dst_conv(rrd.ds_def[i].dst);
615 /* NOTE: DST_CDEF should never enter this if block, because
616 * updvals[i+1][0] is initialized to 'U'; unless the caller
617 * accidently specified a value for the DST_CDEF. To handle
618 * this case, an extra check is required. */
619 if((updvals[i+1][0] != 'U') &&
620 (dst_idx != DST_CDEF) &&
621 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
623 /* the data source type defines how to process the data */
624 /* pdp_new contains rate * time ... eg the bytes
625 * transferred during the interval. Doing it this way saves
626 * a lot of math operations */
632 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
633 for(ii=0;updvals[i+1][ii] != '\0';ii++){
634 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
635 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
639 if (rrd_test_error()){
642 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
643 if(dst_idx == DST_COUNTER) {
644 /* simple overflow catcher sugestet by andres kroonmaa */
645 /* this will fail terribly for non 32 or 64 bit counters ... */
646 /* are there any others in SNMP land ? */
647 if (pdp_new[i] < (double)0.0 )
648 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
649 if (pdp_new[i] < (double)0.0 )
650 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
652 rate = pdp_new[i] / interval;
660 pdp_new[i] = strtod(updvals[i+1],&endptr);
662 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
665 if (endptr[0] != '\0'){
666 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
669 rate = pdp_new[i] / interval;
673 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
675 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
678 if (endptr[0] != '\0'){
679 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
682 rate = pdp_new[i] / interval;
685 rrd_set_error("rrd contains unknown DS type : '%s'",
689 /* break out of this for loop if the error string is set */
690 if (rrd_test_error()){
693 /* make sure pdp_temp is neither too large or too small
694 * if any of these occur it becomes unknown ...
696 if ( ! isnan(rate) &&
697 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
698 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
699 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
700 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
704 /* no news is news all the same */
708 /* make a copy of the command line argument for the next run */
716 rrd.pdp_prep[i].last_ds,
717 updvals[i+1], pdp_new[i]);
719 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
720 strncpy(rrd.pdp_prep[i].last_ds,
721 updvals[i+1],LAST_DS_LEN-1);
722 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
725 /* break out of the argument parsing loop if the error_string is set */
726 if (rrd_test_error()){
730 /* has a pdp_st moment occurred since the last run ? */
732 if (proc_pdp_st == occu_pdp_st){
733 /* no we have not passed a pdp_st moment. therefore update is simple */
735 for(i=0;i<rrd.stat_head->ds_cnt;i++){
736 if(isnan(pdp_new[i]))
737 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
739 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
746 rrd.pdp_prep[i].scratch[PDP_val].u_val,
747 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
751 /* an pdp_st has occurred. */
753 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
754 * occurred up to the last run.
755 pdp_new[] contains rate*seconds from the latest run.
756 pdp_temp[] will contain the rate for cdp */
758 for(i=0;i<rrd.stat_head->ds_cnt;i++){
759 /* update pdp_prep to the current pdp_st */
760 if(isnan(pdp_new[i]))
761 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
763 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
764 pdp_new[i]/(double)interval*(double)pre_int;
766 /* if too much of the pdp_prep is unknown we dump it */
767 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
768 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
769 (occu_pdp_st-proc_pdp_st <=
770 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
773 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
774 / (double)( occu_pdp_st
776 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
779 /* process CDEF data sources; remember each CDEF DS can
780 * only reference other DS with a lower index number */
781 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
783 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
784 /* substitue data values for OP_VARIABLE nodes */
785 for (ii = 0; rpnp[ii].op != OP_END; ii++)
787 if (rpnp[ii].op == OP_VARIABLE) {
788 rpnp[ii].op = OP_NUMBER;
789 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
792 /* run the rpn calculator */
793 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
795 break; /* exits the data sources pdp_temp loop */
799 /* make pdp_prep ready for the next run */
800 if(isnan(pdp_new[i])){
801 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
802 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
804 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
805 rrd.pdp_prep[i].scratch[PDP_val].u_val =
806 pdp_new[i]/(double)interval*(double)post_int;
814 "new_unkn_sec %5lu\n",
816 rrd.pdp_prep[i].scratch[PDP_val].u_val,
817 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
821 /* if there were errors during the last loop, bail out here */
822 if (rrd_test_error()){
827 /* compute the number of elapsed pdp_st moments */
828 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
830 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
832 if (rra_step_cnt == NULL)
834 rra_step_cnt = (unsigned long *)
835 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
838 for(i = 0, rra_start = rra_begin;
839 i < rrd.stat_head->rra_cnt;
840 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
843 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
844 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
845 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
846 if (start_pdp_offset <= elapsed_pdp_st) {
847 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
848 rrd.rra_def[i].pdp_cnt + 1;
853 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
855 /* If this is a bulk update, we need to skip ahead in the seasonal
856 * arrays so that they will be correct for the next observed value;
857 * note that for the bulk update itself, no update will occur to
858 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
860 if (rra_step_cnt[i] > 2)
862 /* skip update by resetting rra_step_cnt[i],
863 * note that this is not data source specific; this is due
864 * to the bulk update, not a DNAN value for the specific data
867 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
868 &last_seasonal_coef);
869 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
873 /* periodically run a smoother for seasonal effects */
874 /* Need to use first cdp parameter buffer to track
875 * burnin (burnin requires a specific smoothing schedule).
876 * The CDP_init_seasonal parameter is really an RRA level,
877 * not a data source within RRA level parameter, but the rra_def
878 * is read only for rrd_update (not flushed to disk). */
879 iii = i*(rrd.stat_head -> ds_cnt);
880 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
883 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
884 > rrd.rra_def[i].row_cnt - 1) {
885 /* mark off one of the burnin cycles */
886 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
890 /* someone has no doubt invented a trick to deal with this
891 * wrap around, but at least this code is clear. */
892 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
893 rrd.rra_ptr[i].cur_row)
895 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
896 * mapping between PDP and CDP */
897 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
898 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
902 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
903 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
904 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
909 /* can't rely on negative numbers because we are working with
911 /* Don't need modulus here. If we've wrapped more than once, only
912 * one smooth is executed at the end. */
913 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
914 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
915 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
919 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
920 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
921 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
928 rra_current = ftell(rrd_file);
929 } /* if cf is DEVSEASONAL or SEASONAL */
931 if (rrd_test_error()) break;
933 /* update CDP_PREP areas */
934 /* loop over data soures within each RRA */
936 ii < rrd.stat_head->ds_cnt;
940 /* iii indexes the CDP prep area for this data source within the RRA */
941 iii=i*rrd.stat_head->ds_cnt+ii;
943 if (rrd.rra_def[i].pdp_cnt > 1) {
945 if (rra_step_cnt[i] > 0) {
946 /* If we are in this block, as least 1 CDP value will be written to
947 * disk, this is the CDP_primary_val entry. If more than 1 value needs
948 * to be written, then the "fill in" value is the CDP_secondary_val
950 if (isnan(pdp_temp[ii]))
952 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
953 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
955 /* CDP_secondary value is the RRA "fill in" value for intermediary
956 * CDP data entries. No matter the CF, the value is the same because
957 * the average, max, min, and last of a list of identical values is
958 * the same, namely, the value itself. */
959 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
962 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
963 > rrd.rra_def[i].pdp_cnt*
964 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
966 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
967 /* initialize carry over */
968 if (current_cf == CF_AVERAGE) {
969 if (isnan(pdp_temp[ii])) {
970 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
972 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
973 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
976 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
979 rrd_value_t cum_val, cur_val;
980 switch (current_cf) {
982 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
983 cur_val = IFDNAN(pdp_temp[ii],0.0);
984 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
985 (cum_val + cur_val * start_pdp_offset) /
986 (rrd.rra_def[i].pdp_cnt
987 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
988 /* initialize carry over value */
989 if (isnan(pdp_temp[ii])) {
990 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
992 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
993 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
997 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
998 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1000 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1001 isnan(pdp_temp[ii])) {
1003 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1008 if (cur_val > cum_val)
1009 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1011 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1012 /* initialize carry over value */
1013 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1016 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1017 cur_val = IFDNAN(pdp_temp[ii],DINF);
1019 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1020 isnan(pdp_temp[ii])) {
1022 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1027 if (cur_val < cum_val)
1028 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1030 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1031 /* initialize carry over value */
1032 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1036 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1037 /* initialize carry over value */
1038 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1041 } /* endif meets xff value requirement for a valid value */
1042 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1043 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1044 if (isnan(pdp_temp[ii]))
1045 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1046 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1048 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1049 } else /* rra_step_cnt[i] == 0 */
1052 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1053 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1056 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1057 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1060 if (isnan(pdp_temp[ii])) {
1061 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1062 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1064 if (current_cf == CF_AVERAGE) {
1065 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1068 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1071 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1072 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1075 switch (current_cf) {
1077 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1081 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1082 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1085 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1086 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1090 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1095 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1096 if (elapsed_pdp_st > 2)
1098 switch (current_cf) {
1101 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1102 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1105 case CF_DEVSEASONAL:
1106 /* need to update cached seasonal values, so they are consistent
1107 * with the bulk update */
1108 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1109 * CDP_last_deviation are the same. */
1110 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1111 last_seasonal_coef[ii];
1112 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1116 /* need to update the null_count and last_null_count.
1117 * even do this for non-DNAN pdp_temp because the
1118 * algorithm is not learning from batch updates. */
1119 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1121 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1125 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1126 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1129 /* do not count missed bulk values as failures */
1130 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1131 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1132 /* need to reset violations buffer.
1133 * could do this more carefully, but for now, just
1134 * assume a bulk update wipes away all violations. */
1135 erase_violations(&rrd, iii, i);
1139 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1141 if (rrd_test_error()) break;
1143 } /* endif data sources loop */
1144 } /* end RRA Loop */
1146 /* this loop is only entered if elapsed_pdp_st < 3 */
1147 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1148 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1150 for(i = 0, rra_start = rra_begin;
1151 i < rrd.stat_head->rra_cnt;
1152 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1155 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1157 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1158 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1160 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1161 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1163 rra_current = ftell(rrd_file);
1165 if (rrd_test_error()) break;
1166 /* loop over data soures within each RRA */
1168 ii < rrd.stat_head->ds_cnt;
1171 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1172 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1173 scratch_idx, seasonal_coef);
1175 } /* end RRA Loop */
1176 if (rrd_test_error()) break;
1177 } /* end elapsed_pdp_st loop */
1179 if (rrd_test_error()) break;
1181 /* Ready to write to disk */
1182 /* Move sequentially through the file, writing one RRA at a time.
1183 * Note this architecture divorces the computation of CDP with
1184 * flushing updated RRA entries to disk. */
1185 for(i = 0, rra_start = rra_begin;
1186 i < rrd.stat_head->rra_cnt;
1187 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1189 /* is there anything to write for this RRA? If not, continue. */
1190 if (rra_step_cnt[i] == 0) continue;
1192 /* write the first row */
1194 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1196 rrd.rra_ptr[i].cur_row++;
1197 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1198 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1199 /* positition on the first row */
1200 rra_pos_tmp = rra_start +
1201 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1202 if(rra_pos_tmp != rra_current) {
1203 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1204 rrd_set_error("seek error in rrd");
1207 rra_current = rra_pos_tmp;
1211 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1213 scratch_idx = CDP_primary_val;
1214 if (pcdp_summary != NULL)
1216 rra_time = (current_time - current_time
1217 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1218 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1220 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1221 pcdp_summary, &rra_time);
1222 if (rrd_test_error()) break;
1224 /* write other rows of the bulk update, if any */
1225 scratch_idx = CDP_secondary_val;
1226 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1228 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1231 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1232 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1235 rrd.rra_ptr[i].cur_row = 0;
1236 /* seek back to beginning of current rra */
1237 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1239 rrd_set_error("seek error in rrd");
1243 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1245 rra_current = rra_start;
1247 if (pcdp_summary != NULL)
1249 rra_time = (current_time - current_time
1250 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1251 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1253 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1254 pcdp_summary, &rra_time);
1257 if (rrd_test_error())
1261 /* break out of the argument parsing loop if error_string is set */
1262 if (rrd_test_error()){
1267 } /* endif a pdp_st has occurred */
1268 rrd.live_head->last_up = current_time;
1269 rrd.live_head->last_up_usec = current_time_usec;
1271 } /* function argument loop */
1273 if (seasonal_coef != NULL) free(seasonal_coef);
1274 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1275 if (rra_step_cnt != NULL) free(rra_step_cnt);
1276 rpnstack_free(&rpnstack);
1278 /* if we got here and if there is an error and if the file has not been
1279 * written to, then close things up and return. */
1280 if (rrd_test_error()) {
1290 /* aargh ... that was tough ... so many loops ... anyway, its done.
1291 * we just need to write back the live header portion now*/
1293 if (fseek(rrd_file, (sizeof(stat_head_t)
1294 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1295 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1297 rrd_set_error("seek rrd for live header writeback");
1308 if(fwrite( rrd.live_head,
1309 sizeof(live_head_t), 1, rrd_file) != 1){
1310 rrd_set_error("fwrite live_head to rrd");
1321 if(fwrite( &rrd.live_head->last_up,
1322 sizeof(time_t), 1, rrd_file) != 1){
1323 rrd_set_error("fwrite live_head to rrd");
1335 if(fwrite( rrd.pdp_prep,
1337 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1338 rrd_set_error("ftwrite pdp_prep to rrd");
1348 if(fwrite( rrd.cdp_prep,
1350 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1351 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1353 rrd_set_error("ftwrite cdp_prep to rrd");
1363 if(fwrite( rrd.rra_ptr,
1365 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1366 rrd_set_error("fwrite rra_ptr to rrd");
1376 /* OK now close the files and free the memory */
1377 if(fclose(rrd_file) != 0){
1378 rrd_set_error("closing rrd");
1387 /* calling the smoothing code here guarantees at most
1388 * one smoothing operation per rrd_update call. Unfortunately,
1389 * it is possible with bulk updates, or a long-delayed update
1390 * for smoothing to occur off-schedule. This really isn't
1391 * critical except during the burning cycles. */
1392 if (schedule_smooth)
1395 rrd_file = fopen(filename,"r+");
1397 rrd_file = fopen(filename,"rb+");
1399 rra_start = rra_begin;
1400 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1402 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1403 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1406 fprintf(stderr,"Running smoother for rra %ld\n",i);
1408 apply_smoother(&rrd,i,rra_start,rrd_file);
1409 if (rrd_test_error())
1412 rra_start += rrd.rra_def[i].row_cnt
1413 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1426 * get exclusive lock to whole file.
1427 * lock gets removed when we close the file
1429 * returns 0 on success
1432 LockRRD(FILE *rrdfile)
1434 int rrd_fd; /* File descriptor for RRD */
1437 rrd_fd = fileno(rrdfile);
1442 lock.l_type = F_WRLCK; /* exclusive write lock */
1443 lock.l_len = 0; /* whole file */
1444 lock.l_start = 0; /* start of file */
1445 lock.l_whence = SEEK_SET; /* end of file */
1447 stat = fcntl(rrd_fd, F_SETLK, &lock);
1451 if ( _fstat( rrd_fd, &st ) == 0 ) {
1452 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1464 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1465 unsigned short CDP_scratch_idx, FILE *rrd_file,
1466 info_t *pcdp_summary, time_t *rra_time)
1468 unsigned long ds_idx, cdp_idx;
1471 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1473 /* compute the cdp index */
1474 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1476 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1477 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1478 rrd -> rra_def[rra_idx].cf_nam);
1480 if (pcdp_summary != NULL)
1482 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1483 /* append info to the return hash */
1484 pcdp_summary = info_push(pcdp_summary,
1485 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1486 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1487 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1490 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1491 sizeof(rrd_value_t),1,rrd_file) != 1)
1493 rrd_set_error("writing rrd");
1496 *rra_current += sizeof(rrd_value_t);
1498 return (pcdp_summary);