1 /*****************************************************************************
2 * RRDtool 1.0.33 Copyright Tobias Oetiker, 1997 - 2000
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.2 2001/03/04 11:14:25 oetiker
9 * added at-style-time@value:value syntax to rrd_update
10 * -- Dave Bodenstab <imdave@mcs.net>
12 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
15 *****************************************************************************/
18 #include <sys/types.h>
22 #include <sys/locking.h>
29 int LockRRD(FILE *rrd_file);
36 main(int argc, char **argv){
37 rrd_update(argc,argv);
38 if (rrd_test_error()) {
39 printf("RRDtool 1.0.33 Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
40 "Usage: rrdupdate filename\n"
41 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
42 "\t\t\ttime|N:value[:value...]\n\n"
43 "\t\t\tat-time@value[:value...]\n\n"
44 "\t\t\t[ time:value[:value...] ..]\n\n");
46 printf("ERROR: %s\n",rrd_get_error());
55 rrd_update(int argc, char **argv)
61 unsigned long rra_begin; /* byte pointer to the rra
62 * area in the rrd file. this
63 * pointer never changes value */
64 unsigned long rra_start; /* byte pointer to the rra
65 * area in the rrd file. this
66 * pointer changes as each rrd is
68 unsigned long rra_current; /* byte pointer to the current write
69 * spot in the rrd file. */
70 unsigned long rra_pos_tmp; /* temporary byte pointer. */
71 unsigned long interval,
72 pre_int,post_int; /* interval between this and
74 unsigned long proc_pdp_st; /* which pdp_st was the last
76 unsigned long occu_pdp_st; /* when was the pdp_st
77 * before the last update
79 unsigned long proc_pdp_age; /* how old was the data in
80 * the pdp prep area when it
82 unsigned long occu_pdp_age; /* how long ago was the last
84 unsigned long pdp_st; /* helper for cdp_prep
86 rrd_value_t *pdp_new; /* prepare the incoming data
89 rrd_value_t *pdp_temp; /* prepare the pdp values
93 long *tmpl_idx; /* index representing the settings
94 transported by the template index */
95 long tmpl_cnt = 2; /* time and data */
99 time_t current_time = time(NULL);
101 int wrote_to_file = 0;
102 char *template = NULL;
106 static struct option long_options[] =
108 {"template", required_argument, 0, 't'},
111 int option_index = 0;
113 opt = getopt_long(argc, argv, "t:",
114 long_options, &option_index);
125 rrd_set_error("unknown option '%s'",argv[optind-1]);
131 /* need at least 2 arguments: filename, data. */
132 if (argc-optind < 2) {
133 rrd_set_error("Not enough arguments");
137 if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
140 rra_current = rra_start = rra_begin = ftell(rrd_file);
141 /* This is defined in the ANSI C standard, section 7.9.5.3:
143 When a file is opened with udpate mode ('+' as the second
144 or third character in the ... list of mode argument
145 variables), both input and ouptut may be performed on the
146 associated stream. However, ... input may not be directly
147 followed by output without an intervening call to a file
148 positioning function, unless the input oepration encounters
150 fseek(rrd_file, 0, SEEK_CUR);
153 /* get exclusive lock to whole file.
154 * lock gets removed when we close the file.
156 if (LockRRD(rrd_file) != 0) {
157 rrd_set_error("could not lock RRD");
163 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
164 rrd_set_error("allocating updvals pointer array");
170 if ((pdp_temp = malloc(sizeof(rrd_value_t)
171 *rrd.stat_head->ds_cnt))==NULL){
172 rrd_set_error("allocating pdp_temp ...");
179 if ((tmpl_idx = malloc(sizeof(unsigned long)
180 *(rrd.stat_head->ds_cnt+1)))==NULL){
181 rrd_set_error("allocating tmpl_idx ...");
188 /* initialize template redirector */
190 tmpl_idx[0] -> 0; (time)
191 tmpl_idx[1] -> 1; (DS 0)
192 tmpl_idx[2] -> 2; (DS 1)
193 tmpl_idx[3] -> 3; (DS 2)
195 for (i=0;i<=rrd.stat_head->ds_cnt;i++) tmpl_idx[i]=i;
196 tmpl_cnt=rrd.stat_head->ds_cnt+1;
201 tmpl_cnt = 1; /* the first entry is the time */
202 tmpl_len = strlen(template);
203 for(i=0;i<=tmpl_len ;i++) {
204 if (template[i] == ':' || template[i] == '\0') {
206 if (tmpl_cnt>rrd.stat_head->ds_cnt){
207 rrd_set_error("Template contains more DS definitions than RRD");
208 free(updvals); free(pdp_temp);
209 free(tmpl_idx); rrd_free(&rrd);
210 fclose(rrd_file); return(-1);
212 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
213 rrd_set_error("unknown DS name '%s'",dsname);
214 free(updvals); free(pdp_temp);
215 free(tmpl_idx); rrd_free(&rrd);
216 fclose(rrd_file); return(-1);
218 /* the first element is always the time */
219 tmpl_idx[tmpl_cnt-1]++;
220 /* go to the next entry on the template */
221 dsname = &template[i+1];
222 /* fix the damage we did before */
231 if ((pdp_new = malloc(sizeof(rrd_value_t)
232 *rrd.stat_head->ds_cnt))==NULL){
233 rrd_set_error("allocating pdp_new ...");
242 /* loop through the arguments. */
243 for(arg_i=optind+1; arg_i<argc;arg_i++) {
244 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
245 char *step_start = stepper;
247 char *parsetime_error = NULL;
248 enum {atstyle, normal} timesyntax;
249 struct time_value ds_tv;
250 if (stepper == NULL){
251 rrd_set_error("faild duplication argv entry");
259 /* initialize all ds input to unknown except the first one
260 which has always got to be set */
261 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
262 strcpy(stepper,argv[arg_i]);
264 /* separate all ds elements; first must be examined separately
265 due to alternate time syntax */
266 if ((p=strchr(stepper,'@'))!=NULL) {
267 timesyntax = atstyle;
270 } else if ((p=strchr(stepper,':'))!=NULL) {
275 rrd_set_error("expected timestamp not found in data source from %s:...",
281 updvals[tmpl_idx[ii]] = stepper;
283 if (*stepper == ':') {
287 updvals[tmpl_idx[ii]] = stepper+1;
293 if (ii != tmpl_cnt-1) {
294 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
295 tmpl_cnt-1, ii, argv[arg_i]);
300 /* get the time from the reading ... handle N */
301 if (timesyntax == atstyle) {
302 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
303 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
307 if (ds_tv.type == RELATIVE_TO_END_TIME ||
308 ds_tv.type == RELATIVE_TO_START_TIME) {
309 rrd_set_error("specifying time relative to the 'start' "
310 "or 'end' makes no sense here: %s",
316 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
317 } else if (strcmp(updvals[0],"N")==0){
318 current_time = time(NULL);
320 current_time = atol(updvals[0]);
323 if(current_time <= rrd.live_head->last_up){
324 rrd_set_error("illegal attempt to update using time %ld when "
325 "last update time is %ld (minimum one second step)",
326 current_time, rrd.live_head->last_up);
332 /* seek to the beginning of the rrd's */
333 if (rra_current != rra_begin) {
334 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
335 rrd_set_error("seek error in rrd");
339 rra_current = rra_begin;
341 rra_start = rra_begin;
343 /* when was the current pdp started */
344 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
345 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
347 /* when did the last pdp_st occur */
348 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
349 occu_pdp_st = current_time - occu_pdp_age;
350 interval = current_time - rrd.live_head->last_up;
352 if (occu_pdp_st > proc_pdp_st){
353 /* OK we passed the pdp_st moment*/
354 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
355 * occurred before the latest
357 post_int = occu_pdp_age; /* how much after it */
371 "post_int %lu\n", proc_pdp_age, proc_pdp_st,
372 occu_pdp_age, occu_pdp_st,
373 interval, pre_int, post_int);
376 /* process the data sources and update the pdp_prep
377 * area accordingly */
378 for(i=0;i<rrd.stat_head->ds_cnt;i++){
380 dst_idx= dst_conv(rrd.ds_def[i].dst);
381 if((updvals[i+1][0] != 'U') &&
382 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
384 /* the data source type defines how to process the data */
385 /* pdp_temp contains rate * time ... eg the bytes
386 * transferred during the interval. Doing it this way saves
387 * a lot of math operations */
393 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
394 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
395 if(dst_idx == DST_COUNTER) {
396 /* simple overflow catcher sugestet by andres kroonmaa */
397 /* this will fail terribly for non 32 or 64 bit counters ... */
398 /* are there any others in SNMP land ? */
399 if (pdp_new[i] < (double)0.0 )
400 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
401 if (pdp_new[i] < (double)0.0 )
402 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
404 rate = pdp_new[i] / interval;
411 pdp_new[i]= atof(updvals[i+1]);
412 rate = pdp_new[i] / interval;
415 pdp_new[i] = atof(updvals[i+1]) * interval;
416 rate = pdp_new[i] / interval;
419 rrd_set_error("rrd contains unknown DS type : '%s'",
423 /* break out of this for loop if the error string is set */
424 if (rrd_test_error()){
427 /* make sure pdp_temp is neither too large or too small
428 * if any of these occur it becomes unknown ...
430 if ( ! isnan(rate) &&
431 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
432 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
433 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
434 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
438 /* no news is news all the same */
442 /* make a copy of the command line argument for the next run */
450 rrd.pdp_prep[i].last_ds,
451 updvals[i+1], pdp_new[i]);
453 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
454 strncpy(rrd.pdp_prep[i].last_ds,
455 updvals[i+1],LAST_DS_LEN-1);
456 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
459 /* break out of the argument parsing loop if the error_string is set */
460 if (rrd_test_error()){
464 /* has a pdp_st moment occurred since the last run ? */
466 if (proc_pdp_st == occu_pdp_st){
467 /* no we have not passed a pdp_st moment. therefore update is simple */
469 for(i=0;i<rrd.stat_head->ds_cnt;i++){
470 if(isnan(pdp_new[i]))
471 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
473 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
480 rrd.pdp_prep[i].scratch[PDP_val].u_val,
481 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
485 /* an pdp_st has occurred. */
487 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
488 * occurred up to the last run.
489 pdp_new[] contains rate*seconds from the latest run.
490 pdp_temp[] will contain the rate for cdp */
493 for(i=0;i<rrd.stat_head->ds_cnt;i++){
494 /* update pdp_prep to the current pdp_st */
495 if(isnan(pdp_new[i]))
496 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
498 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
499 pdp_new[i]/(double)interval*(double)pre_int;
501 /* if too much of the pdp_prep is unknown we dump it */
502 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
503 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
504 (occu_pdp_st-proc_pdp_st <=
505 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
508 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
509 / (double)( occu_pdp_st
511 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
513 /* make pdp_prep ready for the next run */
514 if(isnan(pdp_new[i])){
515 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
516 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
518 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
519 rrd.pdp_prep[i].scratch[PDP_val].u_val =
520 pdp_new[i]/(double)interval*(double)post_int;
528 "new_unkn_sec %5lu\n",
530 rrd.pdp_prep[i].scratch[PDP_val].u_val,
531 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
536 /* now we have to integrate this data into the cdp_prep areas */
537 /* going through the round robin archives */
539 i < rrd.stat_head->rra_cnt;
541 enum cf_en current_cf = cf_conv(rrd.rra_def[i].cf_nam);
542 /* going through all pdp_st moments which have occurred
543 * since the last run */
544 for(pdp_st = proc_pdp_st+rrd.stat_head->pdp_step;
545 pdp_st <= occu_pdp_st;
546 pdp_st += rrd.stat_head->pdp_step){
549 fprintf(stderr,"RRA %lu STEP %lu\n",i,pdp_st);
553 (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step)) == 0){
555 /* later on the cdp_prep values will be transferred to
556 * the rra. we want to be in the right place. */
557 rrd.rra_ptr[i].cur_row++;
558 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
559 /* oops ... we have to wrap the beast ... */
560 rrd.rra_ptr[i].cur_row=0;
562 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
564 /* determine if a seek is even needed. */
565 rra_pos_tmp = rra_start +
566 rrd.stat_head->ds_cnt*rrd.rra_ptr[i].cur_row*sizeof(rrd_value_t);
567 if(rra_pos_tmp != rra_current) {
568 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
569 rrd_set_error("seek error in rrd");
572 rra_current = rra_pos_tmp;
575 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
580 ii < rrd.stat_head->ds_cnt;
582 iii=i*rrd.stat_head->ds_cnt+ii;
584 /* the contents of cdp_prep[].scratch[CDP_val].u_val depends
585 * on the consolidation function ! */
587 if (isnan(pdp_temp[ii])){ /* pdp is unknown */
588 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt++;
590 fprintf(stderr," ** UNKNOWN ADD %lu\n",
591 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
594 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)){
595 /* cdp_prep is unknown when it does not
596 * yet contain data. It can not be zero for
597 * things like mim and max consolidation
600 fprintf(stderr," ** INIT CDP %e\n", pdp_temp[ii]);
602 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
607 rrd.cdp_prep[iii].scratch[CDP_val].u_val+=pdp_temp[ii];
609 fprintf(stderr," ** AVERAGE %e\n",
610 rrd.cdp_prep[iii].scratch[CDP_val].u_val);
614 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
615 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
617 fprintf(stderr," ** MINIMUM %e\n",
618 rrd.cdp_prep[iii].scratch[CDP_val].u_val);
622 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
623 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
625 fprintf(stderr," ** MAXIMUM %e\n",
626 rrd.cdp_prep[iii].scratch[CDP_val].u_val);
630 rrd.cdp_prep[iii].scratch[CDP_val].u_val=pdp_temp[ii];
632 fprintf(stderr," ** LAST %e\n",
633 rrd.cdp_prep[iii].scratch[CDP_val].u_val);
637 rrd_set_error("Unknown cf %s",
638 rrd.rra_def[i].cf_nam);
645 /* is the data in the cdp_prep ready to go into
648 (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step)) == 0){
650 /* prepare cdp_pref for its transition to the rra. */
651 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
652 > rrd.rra_def[i].pdp_cnt*
653 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
654 /* to much of the cdp_prep is unknown ... */
655 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
656 else if (current_cf == CF_AVERAGE){
657 /* for a real average we have to divide
658 * the sum we built earlier on. While ignoring
659 * the unknown pdps */
660 rrd.cdp_prep[iii].scratch[CDP_val].u_val
661 /= (rrd.rra_def[i].pdp_cnt
662 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
664 /* we can write straight away, because we are
665 * already in the right place ... */
668 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld\n",
669 rrd.cdp_prep[iii].scratch[CDP_val].u_val,ftell(rrd_file));
672 if(fwrite(&(rrd.cdp_prep[iii].scratch[CDP_val].u_val),
673 sizeof(rrd_value_t),1,rrd_file) != 1){
674 rrd_set_error("writing rrd");
677 rra_current += sizeof(rrd_value_t);
681 fprintf(stderr," -- RRA WROTE new at %ld\n",ftell(rrd_file));
684 /* make cdp_prep ready for the next run */
685 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
686 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
689 /* break out of this loop if error_string has been set */
690 if (rrd_test_error())
693 /* break out of this loop if error_string has been set */
694 if (rrd_test_error())
696 /* to be able to position correctly in the next rra w move
697 * the rra_start pointer on to the next rra */
698 rra_start += rrd.rra_def[i].row_cnt
699 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
702 /* break out of the argument parsing loop if error_string is set */
703 if (rrd_test_error()){
708 rrd.live_head->last_up = current_time;
713 /* if we got here and if there is an error and if the file has not been
714 * written to, then close things up and return. */
715 if (rrd_test_error()) {
725 /* aargh ... that was tough ... so many loops ... anyway, its done.
726 * we just need to write back the live header portion now*/
728 if (fseek(rrd_file, (sizeof(stat_head_t)
729 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
730 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
732 rrd_set_error("seek rrd for live header writeback");
742 if(fwrite( rrd.live_head,
743 sizeof(live_head_t), 1, rrd_file) != 1){
744 rrd_set_error("fwrite live_head to rrd");
754 if(fwrite( rrd.pdp_prep,
756 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
757 rrd_set_error("ftwrite pdp_prep to rrd");
767 if(fwrite( rrd.cdp_prep,
769 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
770 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
772 rrd_set_error("ftwrite cdp_prep to rrd");
782 if(fwrite( rrd.rra_ptr,
784 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
785 rrd_set_error("fwrite rra_ptr to rrd");
795 /* OK now close the files and free the memory */
796 if(fclose(rrd_file) != 0){
797 rrd_set_error("closing rrd");
815 * get exclusive lock to whole file.
816 * lock gets removed when we close the file
818 * returns 0 on success
821 LockRRD(FILE *rrdfile)
823 int rrd_fd; /* File descriptor for RRD */
826 rrd_fd = fileno(rrdfile);
831 lock.l_type = F_WRLCK; /* exclusive write lock */
832 lock.l_len = 0; /* whole file */
833 lock.l_start = 0; /* start of file */
834 lock.l_whence = SEEK_SET; /* end of file */
836 stat = fcntl(rrd_fd, F_SETLK, &lock);
840 if ( _fstat( rrd_fd, &st ) == 0 ) {
841 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );