Support for COMPUTE data sources (CDEF data sources). Removes the RPN
[rrdtool.git] / src / rrd_update.c
index 08ecb49..35ac6a6 100644 (file)
@@ -5,6 +5,14 @@
  *****************************************************************************
  * $Id$
  * $Log$
+ * Revision 1.4  2001/03/10 23:54:41  oetiker
+ * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
+ * parser and calculator from rrd_graph and puts then in a new file,
+ * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
+ * clean-up of aberrant behavior stuff, including a bug fix.
+ * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
+ * -- Jake Brutlag <jakeb@corp.webtv.net>
+ *
  * Revision 1.3  2001/03/04 13:01:55  oetiker
  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
@@ -24,6 +32,8 @@
 #include "rrd_tool.h"
 #include <sys/types.h>
 #include <fcntl.h>
+#include "rrd_hw.h"
+#include "rrd_rpncalc.h"
 
 #ifdef WIN32
  #include <sys/locking.h>
  #include <io.h>
 #endif
 
-/* Prototypes */
+/* Local prototypes */
 int LockRRD(FILE *rrd_file);
 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
     unsigned short CDP_scratch_idx, FILE *rrd_file);
  
-/*#define DEBUG */
-
 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
 
 
@@ -124,6 +132,9 @@ rrd_update(int argc, char **argv)
                                         /* index into the CDP scratch array */
     enum cf_en       current_cf;
                                         /* numeric id of the current consolidation function */
+    rpnstack_t       rpnstack; /* used for COMPUTE DS */
+
+    rpnstack_init(&rpnstack);
 
     while (1) {
        static struct option long_options[] =
@@ -209,14 +220,19 @@ rrd_update(int argc, char **argv)
        return(-1);
     }
     /* initialize template redirector */
-    /* default config
+    /* default config example (assume DS 1 is a CDEF DS)
        tmpl_idx[0] -> 0; (time)
        tmpl_idx[1] -> 1; (DS 0)
-       tmpl_idx[2] -> 2; (DS 1)
-       tmpl_idx[3] -> 3; (DS 2)
-       ... */
-    for (i=0;i<=rrd.stat_head->ds_cnt;i++) tmpl_idx[i]=i;
-    tmpl_cnt=rrd.stat_head->ds_cnt+1;
+       tmpl_idx[2] -> 3; (DS 2)
+       tmpl_idx[3] -> 4; (DS 3) */
+    tmpl_idx[0] = 0; /* time */
+    for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
+       {
+          if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
+             tmpl_idx[ii++]=i;
+       }
+    tmpl_cnt= ii;
+
     if (template) {
        char *dsname;
        int tmpl_len;
@@ -401,7 +417,12 @@ rrd_update(int argc, char **argv)
        for(i=0;i<rrd.stat_head->ds_cnt;i++){
            enum dst_en dst_idx;
            dst_idx= dst_conv(rrd.ds_def[i].dst);
+               /* NOTE: DST_CDEF should never enter this if block, because
+                * updvals[i+1][0] is initialized to 'U'; unless the caller
+                * accidently specified a value for the DST_CDEF. To handle 
+                * this case, an extra check is required. */
            if((updvals[i+1][0] != 'U') &&
+                  (dst_idx != DST_CDEF) &&
               rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
               double rate = DNAN;
               /* the data source type defines how to process the data */
@@ -512,7 +533,6 @@ rrd_update(int argc, char **argv)
            pdp_new[] contains rate*seconds from the latest run.
            pdp_temp[] will contain the rate for cdp */
 
-
            for(i=0;i<rrd.stat_head->ds_cnt;i++){
                /* update pdp_prep to the current pdp_st */
                if(isnan(pdp_new[i]))
@@ -533,6 +553,27 @@ rrd_update(int argc, char **argv)
                                   - proc_pdp_st
                                   - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
                }
+
+               /* process CDEF data sources; remember each CDEF DS can
+                * only reference other DS with a lower index number */
+           if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
+                  rpnp_t *rpnp;
+                  rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
+                  /* substitue data values for OP_VARIABLE nodes */
+                  for (ii = 0; rpnp[ii].op != OP_END; ii++)
+                  {
+                         if (rpnp[ii].op == OP_VARIABLE) {
+                                rpnp[ii].op = OP_NUMBER;
+                                rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
+                         }
+                  }
+                  /* run the rpn calculator */
+                  if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
+                         free(rpnp);
+                         break; /* exits the data sources pdp_temp loop */
+                  }
+               }
+        
                /* make pdp_prep ready for the next run */
                if(isnan(pdp_new[i])){
                    rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
@@ -555,6 +596,12 @@ rrd_update(int argc, char **argv)
 #endif
            }
 
+               /* if there were errors during the last loop, bail out here */
+           if (rrd_test_error()){
+              free(step_start);
+              break;
+           }
+
                /* compute the number of elapsed pdp_st moments */
                elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
 #ifdef DEBUG
@@ -891,6 +938,7 @@ rrd_update(int argc, char **argv)
                         lookup_seasonal(&rrd,i,rra_start,rrd_file,
                                    elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
                                &seasonal_coef);
+                 rra_current = ftell(rrd_file);
                          }
                          if (rrd_test_error()) break;
                      /* loop over data soures within each RRA */
@@ -936,6 +984,7 @@ rrd_update(int argc, char **argv)
                   }
                   rra_current = rra_pos_tmp;
                }
+
 #ifdef DEBUG
            fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
 #endif
@@ -988,6 +1037,7 @@ rrd_update(int argc, char **argv)
     if (seasonal_coef != NULL) free(seasonal_coef);
     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
        if (rra_step_cnt != NULL) free(rra_step_cnt);
+    rpnstack_free(&rpnstack);
 
     /* if we got here and if there is an error and if the file has not been
      * written to, then close things up and return. */
@@ -997,7 +1047,7 @@ rrd_update(int argc, char **argv)
        rrd_free(&rrd);
        free(pdp_temp);
        free(pdp_new);
-        fclose(rrd_file);
+       fclose(rrd_file);
        return(-1);
     }
 
@@ -1014,7 +1064,7 @@ rrd_update(int argc, char **argv)
        rrd_free(&rrd);
        free(pdp_temp);
        free(pdp_new);
-        fclose(rrd_file);
+       fclose(rrd_file);
        return(-1);
     }
 
@@ -1026,7 +1076,7 @@ rrd_update(int argc, char **argv)
        free(tmpl_idx);
        free(pdp_temp);
        free(pdp_new);
-        fclose(rrd_file);
+       fclose(rrd_file);
        return(-1);
     }
 
@@ -1039,7 +1089,7 @@ rrd_update(int argc, char **argv)
        free(tmpl_idx);
        free(pdp_temp);
        free(pdp_new);
-        fclose(rrd_file);
+       fclose(rrd_file);
        return(-1);
     }
 
@@ -1054,7 +1104,7 @@ rrd_update(int argc, char **argv)
        rrd_free(&rrd);
        free(pdp_temp);
        free(pdp_new);
-        fclose(rrd_file);
+       fclose(rrd_file);
        return(-1);
     }
 
@@ -1067,7 +1117,7 @@ rrd_update(int argc, char **argv)
        rrd_free(&rrd);
        free(pdp_temp);
        free(pdp_new);
-        fclose(rrd_file);
+       fclose(rrd_file);
        return(-1);
     }
 
@@ -1172,7 +1222,7 @@ write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
          fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
             rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
             rrd -> rra_def[rra_idx].cf_nam);
-#endif
+#endif 
 
          if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
                 sizeof(rrd_value_t),1,rrd_file) != 1)