Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
[rrdtool.git] / src / rrd_update.c
index 1e5a83e..08ecb49 100644 (file)
@@ -5,6 +5,13 @@
  *****************************************************************************
  * $Id$
  * $Log$
+ * Revision 1.3  2001/03/04 13:01:55  oetiker
+ * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
+ * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
+ * This is backwards compatible! But new files using the Aberrant stuff are not readable
+ * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
+ * -- Jake Brutlag <jakeb@corp.webtv.net>
+ *
  * Revision 1.2  2001/03/04 11:14:25  oetiker
  * added at-style-time@value:value syntax to rrd_update
  * --  Dave Bodenstab <imdave@mcs.net>
  #include <io.h>
 #endif
 
-
 /* Prototypes */
 int LockRRD(FILE *rrd_file);
-
+void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
+    unsigned short CDP_scratch_idx, FILE *rrd_file);
 /*#define DEBUG */
 
+#define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
+
 
 #ifdef STANDALONE
 int 
@@ -56,7 +66,8 @@ rrd_update(int argc, char **argv)
 {
 
     int              arg_i = 2;
-    long             i,ii,iii;
+    short            j;
+    long             i,ii,iii=1;
 
     unsigned long    rra_begin;          /* byte pointer to the rra
                                          * area in the rrd file.  this
@@ -81,8 +92,6 @@ rrd_update(int argc, char **argv)
                                          * was last updated */
     unsigned long    occu_pdp_age;       /* how long ago was the last
                                          * pdp_step time */
-    unsigned long    pdp_st;             /* helper for cdp_prep 
-                                         * processing */
     rrd_value_t      *pdp_new;           /* prepare the incoming data
                                          * to be added the the
                                          * existing entry */
@@ -98,9 +107,23 @@ rrd_update(int argc, char **argv)
     rrd_t            rrd;
     time_t           current_time = time(NULL);
     char             **updvals;
-    int              wrote_to_file = 0;
-    char             *template = NULL;          
-
+    int              schedule_smooth = 0;
+    char             *template = NULL;   
+       rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
+                                        /* a vector of future Holt-Winters seasonal coefs */
+    unsigned long    elapsed_pdp_st;
+                                        /* number of elapsed PDP steps since last update */
+    unsigned long    *rra_step_cnt = NULL;
+                                        /* number of rows to be updated in an RRA for a data
+                                         * value. */
+    unsigned long    start_pdp_offset;
+                                        /* number of PDP steps since the last update that
+                                         * are assigned to the first CDP to be generated
+                                         * since the last update. */
+    unsigned short   scratch_idx;
+                                        /* index into the CDP scratch array */
+    enum cf_en       current_cf;
+                                        /* numeric id of the current consolidation function */
 
     while (1) {
        static struct option long_options[] =
@@ -123,7 +146,7 @@ rrd_update(int argc, char **argv)
 
        case '?':
            rrd_set_error("unknown option '%s'",argv[optind-1]);
-            rrd_free(&rrd);
+            rrd_free(&rrd);            
            return(-1);
        }
     }
@@ -158,7 +181,7 @@ rrd_update(int argc, char **argv)
       rrd_free(&rrd);
       fclose(rrd_file);
       return(-1);   
-    }
+    } 
 
     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
        rrd_set_error("allocating updvals pointer array");
@@ -248,7 +271,7 @@ rrd_update(int argc, char **argv)
        enum {atstyle, normal} timesyntax;
        struct time_value ds_tv;
         if (stepper == NULL){
-                rrd_set_error("faild duplication argv entry");
+                rrd_set_error("failed duplication argv entry");
                 free(updvals);
                 free(pdp_temp);  
                 free(tmpl_idx);
@@ -329,7 +352,7 @@ rrd_update(int argc, char **argv)
        }
        
        
-       /* seek to the beginning of the rrd's */
+       /* seek to the beginning of the rra's */
        if (rra_current != rra_begin) {
            if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
                rrd_set_error("seek error in rrd");
@@ -532,183 +555,439 @@ rrd_update(int argc, char **argv)
 #endif
            }
 
-
-           /* now we have to integrate this data into the cdp_prep areas */
-           /* going through the round robin archives */
-           for(i = 0;
-               i < rrd.stat_head->rra_cnt;
-               i++){
-               enum cf_en current_cf = cf_conv(rrd.rra_def[i].cf_nam);
-               /* going through all pdp_st moments which have occurred 
-                * since the last run */
-               for(pdp_st  = proc_pdp_st+rrd.stat_head->pdp_step; 
-                   pdp_st <= occu_pdp_st; 
-                   pdp_st += rrd.stat_head->pdp_step){
-
+               /* compute the number of elapsed pdp_st moments */
+               elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
 #ifdef DEBUG
-                   fprintf(stderr,"RRA %lu STEP %lu\n",i,pdp_st);
+               fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
 #endif
+               if (rra_step_cnt == NULL)
+               {
+                  rra_step_cnt = (unsigned long *) 
+                         malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
+               }
 
-                   if((pdp_st %
-                       (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step)) == 0){
+           for(i = 0, rra_start = rra_begin;
+               i < rrd.stat_head->rra_cnt;
+           rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
+               i++)
+               {
+               current_cf = cf_conv(rrd.rra_def[i].cf_nam);
+               start_pdp_offset = rrd.rra_def[i].pdp_cnt -
+                  (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
+        if (start_pdp_offset <= elapsed_pdp_st) {
+           rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
+                     rrd.rra_def[i].pdp_cnt + 1;
+           } else {
+                  rra_step_cnt[i] = 0;
+               }
 
-                       /* later on the cdp_prep values will be transferred to
-                        * the rra.  we want to be in the right place. */
-                       rrd.rra_ptr[i].cur_row++;
-                       if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
-                           /* oops ... we have to wrap the beast ... */
-                           rrd.rra_ptr[i].cur_row=0;                   
+               if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
+               {
+                  /* If this is a bulk update, we need to skip ahead in the seasonal
+                       * arrays so that they will be correct for the next observed value;
+                       * note that for the bulk update itself, no update will occur to
+                       * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
+                       * be set to DNAN. */
+           if (rra_step_cnt[i] > 2) 
+                  {
+                         /* skip update by resetting rra_step_cnt[i],
+                          * note that this is not data source specific; this is due
+                          * to the bulk update, not a DNAN value for the specific data
+                          * source. */
+                         rra_step_cnt[i] = 0;
+              lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
+                            &last_seasonal_coef);
+                     lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
+                            &seasonal_coef);
+                  }
+               
+                 /* periodically run a smoother for seasonal effects */
+                 /* Need to use first cdp parameter buffer to track
+                  * burnin (burnin requires a specific smoothing schedule).
+                  * The CDP_init_seasonal parameter is really an RRA level,
+                  * not a data source within RRA level parameter, but the rra_def
+                  * is read only for rrd_update (not flushed to disk). */
+                 iii = i*(rrd.stat_head -> ds_cnt);
+                 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
+                         <= BURNIN_CYCLES)
+                 {
+                    if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
+                                > rrd.rra_def[i].row_cnt - 1) {
+                          /* mark off one of the burnin cycles */
+                          ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
+                      schedule_smooth = 1;
+                        }  
+                 } else {
+                        /* someone has no doubt invented a trick to deal with this
+                         * wrap around, but at least this code is clear. */
+                        if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
+                            rrd.rra_ptr[i].cur_row)
+                        {
+                                /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
+                                 * mapping between PDP and CDP */
+                                if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
+                                       >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
+                                {
 #ifdef DEBUG
-                       fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
+                                       fprintf(stderr,
+                                       "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
+                    rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
+                                       rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
 #endif
-                       /* determine if a seek is even needed. */
-                       rra_pos_tmp = rra_start +
-                               rrd.stat_head->ds_cnt*rrd.rra_ptr[i].cur_row*sizeof(rrd_value_t);
-                       if(rra_pos_tmp != rra_current) {
-                           if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
-                               rrd_set_error("seek error in rrd");
-                               break;
-                           }
-                           rra_current = rra_pos_tmp;
-                       }
+                                       schedule_smooth = 1;
+                                }
+             } else {
+                                /* can't rely on negative numbers because we are working with
+                                 * unsigned values */
+                                /* Don't need modulus here. If we've wrapped more than once, only
+                                 * one smooth is executed at the end. */
+                                if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
+                                       && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
+                                       >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
+                                {
 #ifdef DEBUG
-                       fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
+                                       fprintf(stderr,
+                                       "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
+                    rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
+                                       rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
 #endif
-                   }
+                                       schedule_smooth = 1;
+                                }
+                        }
+                 }
+
+             rra_current = ftell(rrd_file); 
+               } /* if cf is DEVSEASONAL or SEASONAL */
 
+        if (rrd_test_error()) break;
+
+                   /* update CDP_PREP areas */
+                   /* loop over data soures within each RRA */
                    for(ii = 0;
                        ii < rrd.stat_head->ds_cnt;
-                       ii++){
+                       ii++)
+                       {
+                       
+                       /* iii indexes the CDP prep area for this data source within the RRA */
                        iii=i*rrd.stat_head->ds_cnt+ii;
-                   
-                       /* the contents of cdp_prep[].scratch[CDP_val].u_val depends
-                        * on the consolidation function ! */
-                   
-                       if (isnan(pdp_temp[ii])){    /* pdp is unknown */
-                           rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt++;
+
+                       if (rrd.rra_def[i].pdp_cnt > 1) {
+                         
+                          if (rra_step_cnt[i] > 0) {
+                          /* If we are in this block, as least 1 CDP value will be written to
+                               * disk, this is the CDP_primary_val entry. If more than 1 value needs
+                               * to be written, then the "fill in" value is the CDP_secondary_val
+                               * entry. */
+                                 if (isnan(pdp_temp[ii]))
+                  {
+                                        rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
+                                        rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
+                                 } else {
+                                        /* CDP_secondary value is the RRA "fill in" value for intermediary
+                                         * CDP data entries. No matter the CF, the value is the same because
+                                         * the average, max, min, and last of a list of identical values is
+                                         * the same, namely, the value itself. */
+                                        rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
+                                 }
+                     
+                                 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
+                                     > rrd.rra_def[i].pdp_cnt*
+                                     rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
+                                 {
+                                        rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
+                                        /* initialize carry over */
+                                        if (current_cf == CF_AVERAGE) {
+                                                  if (isnan(pdp_temp[ii])) { 
+                                                         rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
+                                                  } else {
+                                                         rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
+                                                                ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
+                                                  }
+                                        } else {
+                                               rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
+                                        }
+                                 } else {
+                                        rrd_value_t cum_val, cur_val; 
+                                    switch (current_cf) {
+                                               case CF_AVERAGE:
+                                                 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
+                                                 cur_val = IFDNAN(pdp_temp[ii],0.0);
+                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
+                                              (cum_val + cur_val) /
+                                          (rrd.rra_def[i].pdp_cnt
+                                              -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
+                                                  /* initialize carry over value */
+                                                  if (isnan(pdp_temp[ii])) { 
+                                                         rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
+                                                  } else {
+                                                         rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
+                                                                ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
+                                                  }
+                                                  break;
+                                               case CF_MAXIMUM:
+                                                 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
+                                                 cur_val = IFDNAN(pdp_temp[ii],-DINF);
 #ifdef DEBUG
-                           fprintf(stderr,"  ** UNKNOWN ADD %lu\n",
-                                   rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
+                                                 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
+                                                         isnan(pdp_temp[ii])) {
+                                                    fprintf(stderr,
+                                                               "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
+                                                               i,ii);
+                                                        exit(-1);
+                                                 }
 #endif
-                       } else {
-                           if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)){
-                               /* cdp_prep is unknown when it does not
-                                * yet contain data. It can not be zero for
-                                * things like mim and max consolidation
-                                * functions */
+                                                 if (cur_val > cum_val)
+                                                        rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
+                                                 else
+                                                        rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
+                                                 /* initialize carry over value */
+                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
+                                                 break;
+                                               case CF_MINIMUM:
+                                                 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
+                                                 cur_val = IFDNAN(pdp_temp[ii],DINF);
 #ifdef DEBUG
-                               fprintf(stderr,"  ** INIT CDP %e\n", pdp_temp[ii]);
+                                                 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
+                                                         isnan(pdp_temp[ii])) {
+                                                    fprintf(stderr,
+                                                               "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
+                                                               i,ii);
+                                                        exit(-1);
+                                                 }
 #endif
-                               rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
-                           }
-                           else {
-                               switch (current_cf){
-                                   case CF_AVERAGE:                            
-                                       rrd.cdp_prep[iii].scratch[CDP_val].u_val+=pdp_temp[ii];
+                                                 if (cur_val < cum_val)
+                                                        rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
+                                                 else
+                                                        rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
+                                                 /* initialize carry over value */
+                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
+                                                 break;
+                                               case CF_LAST:
+                                               default:
+                                                  rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
+                                                  /* initialize carry over value */
+                                                  rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
+                                               break;
+                                        }
+                                 } /* endif meets xff value requirement for a valid value */
+                                 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
+                                  * is set because CDP_unkn_pdp_cnt is required to compute that value. */
+                                 if (isnan(pdp_temp[ii]))
+                                        rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
+                                               (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
+                                 else
+                                        rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
+               } else  /* rra_step_cnt[i]  == 0 */
+                          {
 #ifdef DEBUG
-                                       fprintf(stderr,"  ** AVERAGE %e\n", 
-                                               rrd.cdp_prep[iii].scratch[CDP_val].u_val);
+                                 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
+                                 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
+                                        i,ii);
+                                 } else {
+                                 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
+                                        i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
+                                 }
 #endif
-                                       break;    
-                                   case CF_MINIMUM:
-                                       if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
+                                 if (isnan(pdp_temp[ii])) {
+                                rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
+                                 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
+                                 {
+                                        if (current_cf == CF_AVERAGE) {
+                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
+                                                  elapsed_pdp_st;
+                                        } else {
                                            rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
+                                        }
 #ifdef DEBUG
-                                       fprintf(stderr,"  ** MINIMUM %e\n", 
-                                               rrd.cdp_prep[iii].scratch[CDP_val].u_val);
+                                        fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
+                                           i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
 #endif
-                                       break;
-                                   case CF_MAXIMUM:
-                                       if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
-                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
+                                 } else {
+                                        switch (current_cf) {
+                                        case CF_AVERAGE:
+                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
+                                                  elapsed_pdp_st;
+                                               break;
+                                        case CF_MINIMUM:
+                                               if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
+                                                  rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
+                                               break; 
+                                        case CF_MAXIMUM:
+                                               if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
+                                                  rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
+                                               break; 
+                                        case CF_LAST:
+                                        default:
+                                               rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
+                                               break;
+                                        }
+                                 }
+                          }
+                       } else { /* rrd.rra_def[i].pdp_cnt == 1 */
+                          if (elapsed_pdp_st > 2)
+                          {
+                                  switch (current_cf) {
+                                  case CF_AVERAGE:
+                                  default:
+                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
+                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
+                                         break;
+                   case CF_SEASONAL:
+                                  case CF_DEVSEASONAL:
+                                         /* need to update cached seasonal values, so they are consistent
+                                          * with the bulk update */
+                      /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
+                                          * CDP_last_deviation are the same. */
+                      rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
+                                                last_seasonal_coef[ii];
+                                         rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
+                                                seasonal_coef[ii];
+                                         break;
+                   case CF_HWPREDICT:
+                                         /* need to update the null_count and last_null_count.
+                                          * even do this for non-DNAN pdp_temp because the
+                                          * algorithm is not learning from batch updates. */
+                                         rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
+                                                elapsed_pdp_st;
+                                         rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
+                                                elapsed_pdp_st - 1;
+                                         /* fall through */
+                                  case CF_DEVPREDICT:
+                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
+                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
+                                         break;
+                   case CF_FAILURES:
+                                         /* do not count missed bulk values as failures */
+                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
+                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
+                                         /* need to reset violations buffer.
+                                          * could do this more carefully, but for now, just
+                                          * assume a bulk update wipes away all violations. */
+                      erase_violations(&rrd, iii, i);
+                                         break;
+                                  }
+                          } 
+                       } /* endif rrd.rra_def[i].pdp_cnt == 1 */
+
+                       if (rrd_test_error()) break;
+
+                       } /* endif data sources loop */
+        } /* end RRA Loop */
+
+               /* this loop is only entered if elapsed_pdp_st < 3 */
+               for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
+                        j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
+               {
+              for(i = 0, rra_start = rra_begin;
+                  i < rrd.stat_head->rra_cnt;
+              rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
+                  i++)
+                  {
+                         if (rrd.rra_def[i].pdp_cnt > 1) continue;
+
+                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
+                         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
+                         {
+                        lookup_seasonal(&rrd,i,rra_start,rrd_file,
+                                   elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
+                               &seasonal_coef);
+                         }
+                         if (rrd_test_error()) break;
+                     /* loop over data soures within each RRA */
+                     for(ii = 0;
+                         ii < rrd.stat_head->ds_cnt;
+                         ii++)
+                         {
+                            update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
+                                       i*(rrd.stat_head->ds_cnt) + ii,i,ii,
+                                   scratch_idx, seasonal_coef);
+                         }
+           } /* end RRA Loop */
+                  if (rrd_test_error()) break;
+           } /* end elapsed_pdp_st loop */
+
+               if (rrd_test_error()) break;
+
+               /* Ready to write to disk */
+               /* Move sequentially through the file, writing one RRA at a time.
+                * Note this architecture divorces the computation of CDP with
+                * flushing updated RRA entries to disk. */
+           for(i = 0, rra_start = rra_begin;
+               i < rrd.stat_head->rra_cnt;
+           rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
+               i++) {
+               /* is there anything to write for this RRA? If not, continue. */
+        if (rra_step_cnt[i] == 0) continue;
+
+               /* write the first row */
 #ifdef DEBUG
-                                       fprintf(stderr,"  ** MAXIMUM %e\n", 
-                                               rrd.cdp_prep[iii].scratch[CDP_val].u_val);
+        fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
 #endif
-                                       break;
-                                   case CF_LAST:
-                                       rrd.cdp_prep[iii].scratch[CDP_val].u_val=pdp_temp[ii];
+           rrd.rra_ptr[i].cur_row++;
+           if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
+                  rrd.rra_ptr[i].cur_row = 0; /* wrap around */
+               /* positition on the first row */
+               rra_pos_tmp = rra_start +
+                  (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
+               if(rra_pos_tmp != rra_current) {
+                  if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
+                     rrd_set_error("seek error in rrd");
+                     break;
+                  }
+                  rra_current = rra_pos_tmp;
+               }
 #ifdef DEBUG
-                                       fprintf(stderr,"  ** LAST %e\n", 
-                                               rrd.cdp_prep[iii].scratch[CDP_val].u_val);
+           fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
 #endif
-                                       break;    
-                                   default:
-                                       rrd_set_error("Unknown cf %s",
-                                                     rrd.rra_def[i].cf_nam);
-                                       break;
-                               }
-                           }
-                       }
-
-
-                       /* is the data in the cdp_prep ready to go into
-                        * its rra ? */
-                       if((pdp_st % 
-                           (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step)) == 0){
-
-                           /* prepare cdp_pref for its transition to the rra. */
-                           if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt 
-                               > rrd.rra_def[i].pdp_cnt*
-                               rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
-                               /* to much of the cdp_prep is unknown ... */
-                               rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
-                           else if (current_cf == CF_AVERAGE){
-                               /* for a real average we have to divide
-                                * the sum we built earlier on. While ignoring
-                                * the unknown pdps */
-                               rrd.cdp_prep[iii].scratch[CDP_val].u_val 
-                                       /= (rrd.rra_def[i].pdp_cnt
-                                           -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
-                           }
-                           /* we can write straight away, because we are
-                            * already in the right place ... */
-
+               scratch_idx = CDP_primary_val;
+               write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
+               if (rrd_test_error()) break;
+
+               /* write other rows of the bulk update, if any */
+               scratch_idx = CDP_secondary_val;
+               for ( ; rra_step_cnt[i] > 1; 
+                    rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
+               {
+                  if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
+                  {
 #ifdef DEBUG
-                           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld\n",
-                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val,ftell(rrd_file));
+              fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
+                         rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
 #endif
-
-                           if(fwrite(&(rrd.cdp_prep[iii].scratch[CDP_val].u_val),
-                                     sizeof(rrd_value_t),1,rrd_file) != 1){
-                               rrd_set_error("writing rrd");
-                               break;
-                           }
-                           rra_current += sizeof(rrd_value_t);
-                           wrote_to_file = 1;
-
+                         /* wrap */
+                         rrd.rra_ptr[i].cur_row = 0;
+                         /* seek back to beginning of current rra */
+                     if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
+                         {
+                        rrd_set_error("seek error in rrd");
+                        break;
+                         }
 #ifdef DEBUG
-                           fprintf(stderr,"  -- RRA WROTE new at %ld\n",ftell(rrd_file));
+                 fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
 #endif
-
-                           /* make cdp_prep ready for the next run */
-                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
-                           rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
-                       }
-                   }
-                   /* break out of this loop if error_string has been set */
-                   if (rrd_test_error())
-                       break;
+                         rra_current = rra_start;
+                  }
+                  write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
                }
-               /* break out of this loop if error_string has been set */
+               
                if (rrd_test_error())
-                   break;
-               /* to be able to position correctly in the next rra w move
-                * the rra_start pointer on to the next rra */
-               rra_start += rrd.rra_def[i].row_cnt
-                       *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
+                 break;
+               } /* RRA LOOP */
 
-           }
            /* break out of the argument parsing loop if error_string is set */
            if (rrd_test_error()){
-               free(step_start);
-               break;
-           }
-       }
+                  free(step_start);
+                  break;
+           } 
+           
+       } /* endif a pdp_st has occurred */ 
        rrd.live_head->last_up = current_time;
        free(step_start);
-    }
+    } /* function argument loop */
 
+    if (seasonal_coef != NULL) free(seasonal_coef);
+    if (last_seasonal_coef != NULL) free(last_seasonal_coef);
+       if (rra_step_cnt != NULL) free(rra_step_cnt);
 
     /* if we got here and if there is an error and if the file has not been
      * written to, then close things up and return. */
@@ -803,6 +1082,36 @@ rrd_update(int argc, char **argv)
        return(-1);
     }
 
+    /* calling the smoothing code here guarantees at most
+        * one smoothing operation per rrd_update call. Unfortunately,
+        * it is possible with bulk updates, or a long-delayed update
+        * for smoothing to occur off-schedule. This really isn't
+        * critical except during the burning cycles. */
+       if (schedule_smooth)
+       {
+#ifndef WIN32
+         rrd_file = fopen(argv[optind],"r+");
+#else
+         rrd_file = fopen(argv[optind],"rb+");
+#endif
+         rra_start = rra_begin;
+         for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
+         {
+           if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
+               cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
+           {
+#ifdef DEBUG
+             fprintf(stderr,"Running smoother for rra %ld\n",i);
+#endif
+             apply_smoother(&rrd,i,rra_start,rrd_file);
+             if (rrd_test_error())
+               break;
+           }
+           rra_start += rrd.rra_def[i].row_cnt
+             *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
+         }
+         fclose(rrd_file);
+       }
     rrd_free(&rrd);
     free(updvals);
     free(tmpl_idx);
@@ -847,3 +1156,30 @@ LockRRD(FILE *rrdfile)
 
     return(stat);
 }
+
+
+void
+write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
+   unsigned short CDP_scratch_idx, FILE *rrd_file)
+{
+   unsigned long ds_idx, cdp_idx;
+
+   for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
+   {
+      /* compute the cdp index */
+      cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
+#ifdef DEBUG
+         fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
+            rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
+            rrd -> rra_def[rra_idx].cf_nam);
+#endif
+
+         if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
+                sizeof(rrd_value_t),1,rrd_file) != 1)
+         { 
+            rrd_set_error("writing rrd");
+                return;
+         }
+         *rra_current += sizeof(rrd_value_t);
+       }
+}