More updates from Bernhard Fischer
[rrdtool.git] / src / rrd_hw.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5  *****************************************************************************
6  * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include "rrd_hw.h"
11
12 /* #define DEBUG */
13
14 /* private functions */
15 unsigned long MyMod(
16     signed long val,
17     unsigned long mod);
18 int       update_hwpredict(
19     rrd_t *rrd,
20     unsigned long cdp_idx,
21     unsigned long rra_idx,
22     unsigned long ds_idx,
23     unsigned short CDP_scratch_idx);
24 int       update_seasonal(
25     rrd_t *rrd,
26     unsigned long cdp_idx,
27     unsigned long rra_idx,
28     unsigned long ds_idx,
29     unsigned short CDP_scratch_idx,
30     rrd_value_t *seasonal_coef);
31 int       update_devpredict(
32     rrd_t *rrd,
33     unsigned long cdp_idx,
34     unsigned long rra_idx,
35     unsigned long ds_idx,
36     unsigned short CDP_scratch_idx);
37 int       update_devseasonal(
38     rrd_t *rrd,
39     unsigned long cdp_idx,
40     unsigned long rra_idx,
41     unsigned long ds_idx,
42     unsigned short CDP_scratch_idx,
43     rrd_value_t *seasonal_dev);
44 int       update_failures(
45     rrd_t *rrd,
46     unsigned long cdp_idx,
47     unsigned long rra_idx,
48     unsigned long ds_idx,
49     unsigned short CDP_scratch_idx);
50
51 int update_hwpredict(
52     rrd_t *rrd,
53     unsigned long cdp_idx,
54     unsigned long rra_idx,
55     unsigned long ds_idx,
56     unsigned short CDP_scratch_idx)
57 {
58     rrd_value_t prediction, seasonal_coef;
59     unsigned long dependent_rra_idx, seasonal_cdp_idx;
60     unival   *coefs = rrd->cdp_prep[cdp_idx].scratch;
61     rra_def_t *current_rra = &(rrd->rra_def[rra_idx]);
62
63     /* save coefficients from current prediction */
64     coefs[CDP_hw_last_intercept].u_val = coefs[CDP_hw_intercept].u_val;
65     coefs[CDP_hw_last_slope].u_val = coefs[CDP_hw_slope].u_val;
66     coefs[CDP_last_null_count].u_cnt = coefs[CDP_null_count].u_cnt;
67
68     /* retrieve the current seasonal coef */
69     dependent_rra_idx = current_rra->par[RRA_dependent_rra_idx].u_cnt;
70     seasonal_cdp_idx = dependent_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
71     if (dependent_rra_idx < rra_idx)
72         seasonal_coef =
73             rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_last_seasonal].
74             u_val;
75     else
76         seasonal_coef =
77             rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_seasonal].u_val;
78
79     /* compute the prediction */
80     if (isnan(coefs[CDP_hw_intercept].u_val)
81         || isnan(coefs[CDP_hw_slope].u_val)
82         || isnan(seasonal_coef)) {
83         prediction = DNAN;
84
85         /* bootstrap initialization of slope and intercept */
86         if (isnan(coefs[CDP_hw_intercept].u_val) &&
87             !isnan(coefs[CDP_scratch_idx].u_val)) {
88 #ifdef DEBUG
89             fprintf(stderr, "Initialization of slope/intercept\n");
90 #endif
91             coefs[CDP_hw_intercept].u_val = coefs[CDP_scratch_idx].u_val;
92             coefs[CDP_hw_last_intercept].u_val = coefs[CDP_scratch_idx].u_val;
93             /* initialize the slope to 0 */
94             coefs[CDP_hw_slope].u_val = 0.0;
95             coefs[CDP_hw_last_slope].u_val = 0.0;
96             /* initialize null count to 1 */
97             coefs[CDP_null_count].u_cnt = 1;
98             coefs[CDP_last_null_count].u_cnt = 1;
99         }
100         /* if seasonal coefficient is NA, then don't update intercept, slope */
101     } else {
102         prediction = coefs[CDP_hw_intercept].u_val +
103             (coefs[CDP_hw_slope].u_val) * (coefs[CDP_null_count].u_cnt)
104             + seasonal_coef;
105 #ifdef DEBUG
106         fprintf(stderr, "computed prediction: %f\n", prediction);
107 #endif
108         if (isnan(coefs[CDP_scratch_idx].u_val)) {
109             /* NA value, no updates of intercept, slope;
110              * increment the null count */
111             (coefs[CDP_null_count].u_cnt)++;
112         } else {
113 #ifdef DEBUG
114             fprintf(stderr, "Updating intercept, slope\n");
115 #endif
116             /* update the intercept */
117             coefs[CDP_hw_intercept].u_val =
118                 (current_rra->par[RRA_hw_alpha].u_val) *
119                 (coefs[CDP_scratch_idx].u_val - seasonal_coef) + (1 -
120                                                                   current_rra->
121                                                                   par
122                                                                   [RRA_hw_alpha].
123                                                                   u_val) *
124                 (coefs[CDP_hw_intercept].u_val +
125                  (coefs[CDP_hw_slope].u_val) * (coefs[CDP_null_count].u_cnt));
126             /* update the slope */
127             coefs[CDP_hw_slope].u_val =
128                 (current_rra->par[RRA_hw_beta].u_val) *
129                 (coefs[CDP_hw_intercept].u_val -
130                  coefs[CDP_hw_last_intercept].u_val) + (1 -
131                                                         current_rra->
132                                                         par[RRA_hw_beta].
133                                                         u_val) *
134                 (coefs[CDP_hw_slope].u_val);
135             /* reset the null count */
136             coefs[CDP_null_count].u_cnt = 1;
137         }
138     }
139
140     /* store the prediction for writing */
141     coefs[CDP_scratch_idx].u_val = prediction;
142     return 0;
143 }
144
145 int lookup_seasonal(
146     rrd_t *rrd,
147     unsigned long rra_idx,
148     unsigned long rra_start,
149     rrd_file_t *rrd_file,
150     unsigned long offset,
151     rrd_value_t **seasonal_coef)
152 {
153     unsigned long pos_tmp;
154
155     /* rra_ptr[].cur_row points to the rra row to be written; this function
156      * reads cur_row + offset */
157     unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
158
159     /* handle wrap around */
160     if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
161         row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
162
163     /* rra_start points to the appropriate rra block in the file */
164     /* compute the pointer to the appropriate location in the file */
165     pos_tmp =
166         rra_start +
167         (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
168
169     /* allocate memory if need be */
170     if (*seasonal_coef == NULL)
171         *seasonal_coef =
172             (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
173                                    sizeof(rrd_value_t));
174     if (*seasonal_coef == NULL) {
175         rrd_set_error("memory allocation failure: seasonal coef");
176         return -1;
177     }
178
179     if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
180         if (rrd_read
181             (rrd_file, *seasonal_coef,
182              sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
183             == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
184             /* success! */
185             /* we can safely ignore the rule requiring a seek operation between read
186              * and write, because this read moves the file pointer to somewhere
187              * in the file other than the next write location.
188              * */
189             return 0;
190         } else {
191             rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
192                           pos_tmp);
193         }
194     } else {
195         rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
196                       pos_tmp);
197     }
198
199     return -1;
200 }
201
202 int update_seasonal(
203     rrd_t *rrd,
204     unsigned long cdp_idx,
205     unsigned long rra_idx,
206     unsigned long ds_idx,
207     unsigned short CDP_scratch_idx,
208     rrd_value_t *seasonal_coef)
209 {
210 /* TODO: extract common if subblocks in the wake of I/O optimization */
211     rrd_value_t intercept, seasonal;
212     rra_def_t *current_rra = &(rrd->rra_def[rra_idx]);
213     rra_def_t *hw_rra =
214         &(rrd->rra_def[current_rra->par[RRA_dependent_rra_idx].u_cnt]);
215     /* obtain cdp_prep index for HWPREDICT */
216     unsigned long hw_cdp_idx = (current_rra->par[RRA_dependent_rra_idx].u_cnt)
217         * (rrd->stat_head->ds_cnt) + ds_idx;
218     unival   *coefs = rrd->cdp_prep[hw_cdp_idx].scratch;
219
220     /* update seasonal coefficient in cdp prep areas */
221     seasonal = rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val;
222     rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = seasonal;
223     rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val =
224         seasonal_coef[ds_idx];
225
226     /* update seasonal value for disk */
227     if (current_rra->par[RRA_dependent_rra_idx].u_cnt < rra_idx)
228         /* associated HWPREDICT has already been updated */
229         /* check for possible NA values */
230         if (isnan(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)) {
231             /* no update, store the old value unchanged,
232              * doesn't matter if it is NA */
233             rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = seasonal;
234         } else if (isnan(coefs[CDP_hw_last_intercept].u_val)
235                    || isnan(coefs[CDP_hw_last_slope].u_val)) {
236             /* this should never happen, as HWPREDICT was already updated */
237             rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = DNAN;
238         } else if (isnan(seasonal)) {
239             /* initialization: intercept is not currently being updated */
240 #ifdef DEBUG
241             fprintf(stderr, "Initialization of seasonal coef %lu\n",
242                     rrd->rra_ptr[rra_idx].cur_row);
243 #endif
244             rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
245                 -= coefs[CDP_hw_last_intercept].u_val;
246         } else {
247             intercept = coefs[CDP_hw_intercept].u_val;
248 #ifdef DEBUG
249             fprintf(stderr,
250                     "Updating seasonal, params: gamma %f, new intercept %f, old seasonal %f\n",
251                     current_rra->par[RRA_seasonal_gamma].u_val,
252                     intercept, seasonal);
253 #endif
254             rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
255                 (current_rra->par[RRA_seasonal_gamma].u_val) *
256                 (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -
257                  intercept) + (1 -
258                                current_rra->par[RRA_seasonal_gamma].u_val) *
259                 seasonal;
260     } else {
261         /* SEASONAL array is updated first, which means the new intercept
262          * hasn't be computed; so we compute it here. */
263
264         /* check for possible NA values */
265         if (isnan(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)) {
266             /* no update, simple store the old value unchanged,
267              * doesn't matter if it is NA */
268             rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = seasonal;
269         } else if (isnan(coefs[CDP_hw_intercept].u_val)
270                    || isnan(coefs[CDP_hw_slope].u_val)) {
271             /* Initialization of slope and intercept will occur.
272              * force seasonal coefficient to 0. */
273             rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = 0.0;
274         } else if (isnan(seasonal)) {
275             /* initialization: intercept will not be updated
276              * CDP_hw_intercept = CDP_hw_last_intercept; just need to 
277              * subtract this baseline value. */
278 #ifdef DEBUG
279             fprintf(stderr, "Initialization of seasonal coef %lu\n",
280                     rrd->rra_ptr[rra_idx].cur_row);
281 #endif
282             rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -=
283                 coefs[CDP_hw_intercept].u_val;
284         } else {
285             /* Note that we must get CDP_scratch_idx from SEASONAL array, as CDP_scratch_idx
286              * for HWPREDICT array will be DNAN. */
287             intercept = (hw_rra->par[RRA_hw_alpha].u_val) *
288                 (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -
289                  seasonal)
290                 + (1 -
291                    hw_rra->par[RRA_hw_alpha].u_val) *
292                 (coefs[CDP_hw_intercept].u_val +
293                  (coefs[CDP_hw_slope].u_val) * (coefs[CDP_null_count].u_cnt));
294             rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
295                 (current_rra->par[RRA_seasonal_gamma].u_val) *
296                 (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -
297                  intercept) + (1 -
298                                current_rra->par[RRA_seasonal_gamma].u_val) *
299                 seasonal;
300         }
301     }
302 #ifdef DEBUG
303     fprintf(stderr, "seasonal coefficient set= %f\n",
304             rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
305 #endif
306     return 0;
307 }
308
309 int update_devpredict(
310     rrd_t *rrd,
311     unsigned long cdp_idx,
312     unsigned long rra_idx,
313     unsigned long ds_idx,
314     unsigned short CDP_scratch_idx)
315 {
316     /* there really isn't any "update" here; the only reason this information
317      * is stored separately from DEVSEASONAL is to preserve deviation predictions
318      * for a longer duration than one seasonal cycle. */
319     unsigned long seasonal_cdp_idx =
320         (rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt)
321         * (rrd->stat_head->ds_cnt) + ds_idx;
322
323     if (rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt < rra_idx) {
324         /* associated DEVSEASONAL array already updated */
325         rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
326             =
327             rrd->cdp_prep[seasonal_cdp_idx].
328             scratch[CDP_last_seasonal_deviation].u_val;
329     } else {
330         /* associated DEVSEASONAL not yet updated */
331         rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
332             =
333             rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_seasonal_deviation].
334             u_val;
335     }
336     return 0;
337 }
338
339 int update_devseasonal(
340     rrd_t *rrd,
341     unsigned long cdp_idx,
342     unsigned long rra_idx,
343     unsigned long ds_idx,
344     unsigned short CDP_scratch_idx,
345     rrd_value_t *seasonal_dev)
346 {
347     rrd_value_t prediction = 0, seasonal_coef = DNAN;
348     rra_def_t *current_rra = &(rrd->rra_def[rra_idx]);
349
350     /* obtain cdp_prep index for HWPREDICT */
351     unsigned long hw_rra_idx = current_rra->par[RRA_dependent_rra_idx].u_cnt;
352     unsigned long hw_cdp_idx = hw_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
353     unsigned long seasonal_cdp_idx;
354     unival   *coefs = rrd->cdp_prep[hw_cdp_idx].scratch;
355
356     rrd->cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val =
357         rrd->cdp_prep[cdp_idx].scratch[CDP_seasonal_deviation].u_val;
358     /* retrieve the next seasonal deviation value, could be NA */
359     rrd->cdp_prep[cdp_idx].scratch[CDP_seasonal_deviation].u_val =
360         seasonal_dev[ds_idx];
361
362     /* retrieve the current seasonal_coef (not to be confused with the
363      * current seasonal deviation). Could make this more readable by introducing
364      * some wrapper functions. */
365     seasonal_cdp_idx =
366         (rrd->rra_def[hw_rra_idx].par[RRA_dependent_rra_idx].u_cnt)
367         * (rrd->stat_head->ds_cnt) + ds_idx;
368     if (rrd->rra_def[hw_rra_idx].par[RRA_dependent_rra_idx].u_cnt < rra_idx)
369         /* SEASONAL array already updated */
370         seasonal_coef =
371             rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_last_seasonal].
372             u_val;
373     else
374         /* SEASONAL array not yet updated */
375         seasonal_coef =
376             rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_seasonal].u_val;
377
378     /* compute the abs value of the difference between the prediction and
379      * observed value */
380     if (hw_rra_idx < rra_idx) {
381         /* associated HWPREDICT has already been updated */
382         if (isnan(coefs[CDP_hw_last_intercept].u_val) ||
383             isnan(coefs[CDP_hw_last_slope].u_val) || isnan(seasonal_coef)) {
384             /* one of the prediction values is uinitialized */
385             rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = DNAN;
386             return 0;
387         } else {
388             prediction = coefs[CDP_hw_last_intercept].u_val +
389                 (coefs[CDP_hw_last_slope].u_val) *
390                 (coefs[CDP_last_null_count].u_cnt)
391                 + seasonal_coef;
392         }
393     } else {
394         /* associated HWPREDICT has NOT been updated */
395         if (isnan(coefs[CDP_hw_intercept].u_val) ||
396             isnan(coefs[CDP_hw_slope].u_val) || isnan(seasonal_coef)) {
397             /* one of the prediction values is uinitialized */
398             rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = DNAN;
399             return 0;
400         } else {
401             prediction = coefs[CDP_hw_intercept].u_val +
402                 (coefs[CDP_hw_slope].u_val) * (coefs[CDP_null_count].u_cnt)
403                 + seasonal_coef;
404         }
405     }
406
407     if (isnan(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)) {
408         /* no update, store existing value unchanged, doesn't
409          * matter if it is NA */
410         rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
411             rrd->cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val;
412     } else
413         if (isnan
414             (rrd->cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].
415              u_val)) {
416         /* initialization */
417 #ifdef DEBUG
418         fprintf(stderr, "Initialization of seasonal deviation\n");
419 #endif
420         rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
421             fabs(prediction -
422                  rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
423     } else {
424         /* exponential smoothing update */
425         rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
426             (rrd->rra_def[rra_idx].par[RRA_seasonal_gamma].u_val) *
427             fabs(prediction -
428                  rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)
429             + (1 -
430                rrd->rra_def[rra_idx].par[RRA_seasonal_gamma].u_val) *
431             (rrd->cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].
432              u_val);
433     }
434     return 0;
435 }
436
437 /* Check for a failure based on a threshold # of violations within the specified
438  * window. */
439 int update_failures(
440     rrd_t *rrd,
441     unsigned long cdp_idx,
442     unsigned long rra_idx,
443     unsigned long ds_idx,
444     unsigned short CDP_scratch_idx)
445 {
446     /* detection of a violation depends on 3 RRAs:
447      * HWPREDICT, SEASONAL, and DEVSEASONAL */
448     rra_def_t *current_rra = &(rrd->rra_def[rra_idx]);
449     unsigned long dev_rra_idx = current_rra->par[RRA_dependent_rra_idx].u_cnt;
450     rra_def_t *dev_rra = &(rrd->rra_def[dev_rra_idx]);
451     unsigned long hw_rra_idx = dev_rra->par[RRA_dependent_rra_idx].u_cnt;
452     rra_def_t *hw_rra = &(rrd->rra_def[hw_rra_idx]);
453     unsigned long seasonal_rra_idx = hw_rra->par[RRA_dependent_rra_idx].u_cnt;
454     unsigned long temp_cdp_idx;
455     rrd_value_t deviation = DNAN;
456     rrd_value_t seasonal_coef = DNAN;
457     rrd_value_t prediction = DNAN;
458     char      violation = 0;
459     unsigned short violation_cnt = 0, i;
460     char     *violations_array;
461
462     /* usual checks to determine the order of the RRAs */
463     temp_cdp_idx = dev_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
464     if (rra_idx < seasonal_rra_idx) {
465         /* DEVSEASONAL not yet updated */
466         deviation =
467             rrd->cdp_prep[temp_cdp_idx].scratch[CDP_seasonal_deviation].u_val;
468     } else {
469         /* DEVSEASONAL already updated */
470         deviation =
471             rrd->cdp_prep[temp_cdp_idx].scratch[CDP_last_seasonal_deviation].
472             u_val;
473     }
474     if (!isnan(deviation)) {
475
476         temp_cdp_idx = seasonal_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
477         if (rra_idx < seasonal_rra_idx) {
478             /* SEASONAL not yet updated */
479             seasonal_coef =
480                 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_seasonal].u_val;
481         } else {
482             /* SEASONAL already updated */
483             seasonal_coef =
484                 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_seasonal].
485                 u_val;
486         }
487         /* in this code block, we know seasonal coef is not DNAN, because deviation is not
488          * null */
489
490         temp_cdp_idx = hw_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
491         if (rra_idx < hw_rra_idx) {
492             /* HWPREDICT not yet updated */
493             prediction =
494                 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_intercept].u_val +
495                 (rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_slope].u_val)
496                 * (rrd->cdp_prep[temp_cdp_idx].scratch[CDP_null_count].u_cnt)
497                 + seasonal_coef;
498         } else {
499             /* HWPREDICT already updated */
500             prediction =
501                 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_intercept].
502                 u_val +
503                 (rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_slope].u_val)
504                 *
505                 (rrd->cdp_prep[temp_cdp_idx].scratch[CDP_last_null_count].
506                  u_cnt)
507                 + seasonal_coef;
508         }
509
510         /* determine if the observed value is a violation */
511         if (!isnan(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)) {
512             if (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val >
513                 prediction +
514                 (current_rra->par[RRA_delta_pos].u_val) * deviation
515                 || rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val <
516                 prediction -
517                 (current_rra->par[RRA_delta_neg].u_val) * deviation)
518                 violation = 1;
519         } else {
520             violation = 1;  /* count DNAN values as violations */
521         }
522
523     }
524
525     /* determine if a failure has occurred and update the failure array */
526     violation_cnt = violation;
527     violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
528     for (i = current_rra->par[RRA_window_len].u_cnt; i > 1; i--) {
529         /* shift */
530         violations_array[i - 1] = violations_array[i - 2];
531         violation_cnt += violations_array[i - 1];
532     }
533     violations_array[0] = violation;
534
535     if (violation_cnt < current_rra->par[RRA_failure_threshold].u_cnt)
536         /* not a failure */
537         rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = 0.0;
538     else
539         rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = 1.0;
540
541     return (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
542 }
543
544 /* For the specified CDP prep area and the FAILURES RRA,
545  * erase all history of past violations.
546  */
547 void erase_violations(
548     rrd_t *rrd,
549     unsigned long cdp_idx,
550     unsigned long rra_idx)
551 {
552     unsigned short i;
553     char     *violations_array;
554
555     /* check that rra_idx is a CF_FAILURES array */
556     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
557 #ifdef DEBUG
558         fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
559                 rrd->rra_def[rra_idx].cf_nam);
560 #endif
561         return;
562     }
563 #ifdef DEBUG
564     fprintf(stderr, "scratch buffer before erase:\n");
565     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
566         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
567     }
568     fprintf(stderr, "\n");
569 #endif
570
571     /* WARNING: an array of longs on disk is treated as an array of chars
572      * in memory. */
573     violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
574     /* erase everything in the part of the CDP scratch array that will be
575      * used to store violations for the current window */
576     for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
577         violations_array[i - 1] = 0;
578     }
579 #ifdef DEBUG
580     fprintf(stderr, "scratch buffer after erase:\n");
581     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
582         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
583     }
584     fprintf(stderr, "\n");
585 #endif
586 }
587
588 /* Smooth a periodic array with a moving average: equal weights and
589  * length = 5% of the period. */
590 int apply_smoother(
591     rrd_t *rrd,
592     unsigned long rra_idx,
593     unsigned long rra_start,
594     rrd_file_t *rrd_file)
595 {
596     unsigned long i, j, k;
597     unsigned long totalbytes;
598     rrd_value_t *rrd_values;
599     unsigned long row_length = rrd->stat_head->ds_cnt;
600     unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
601     unsigned long offset;
602     FIFOqueue **buffers;
603     rrd_value_t *working_average;
604     rrd_value_t *baseline;
605
606     offset = floor(0.025 * row_count);
607     if (offset == 0)
608         return 0;       /* no smoothing */
609
610     /* allocate memory */
611     totalbytes = sizeof(rrd_value_t) * row_length * row_count;
612     rrd_values = (rrd_value_t *) malloc(totalbytes);
613     if (rrd_values == NULL) {
614         rrd_set_error("apply smoother: memory allocation failure");
615         return -1;
616     }
617
618     /* rra_start is at the beginning of this rra */
619     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
620         rrd_set_error("seek to rra %d failed", rra_start);
621         free(rrd_values);
622         return -1;
623     }
624     rrd_flush(rrd_file);
625     /* could read all data in a single block, but we need to
626      * check for NA values */
627     for (i = 0; i < row_count; ++i) {
628         for (j = 0; j < row_length; ++j) {
629             if (rrd_read
630                 (rrd_file, &(rrd_values[i * row_length + j]),
631                  sizeof(rrd_value_t) * 1)
632                 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
633                 rrd_set_error("reading value failed: %s",
634                               rrd_strerror(errno));
635             }
636             if (isnan(rrd_values[i * row_length + j])) {
637                 /* can't apply smoothing, still uninitialized values */
638 #ifdef DEBUG
639                 fprintf(stderr,
640                         "apply_smoother: NA detected in seasonal array: %ld %ld\n",
641                         i, j);
642 #endif
643                 free(rrd_values);
644                 return 0;
645             }
646         }
647     }
648
649     /* allocate queues, one for each data source */
650     buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
651     for (i = 0; i < row_length; ++i) {
652         queue_alloc(&(buffers[i]), 2 * offset + 1);
653     }
654     /* need working average initialized to 0 */
655     working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
656     baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
657
658     /* compute sums of the first 2*offset terms */
659     for (i = 0; i < 2 * offset; ++i) {
660         k = MyMod(i - offset, row_count);
661         for (j = 0; j < row_length; ++j) {
662             queue_push(buffers[j], rrd_values[k * row_length + j]);
663             working_average[j] += rrd_values[k * row_length + j];
664         }
665     }
666
667     /* compute moving averages */
668     for (i = offset; i < row_count + offset; ++i) {
669         for (j = 0; j < row_length; ++j) {
670             k = MyMod(i, row_count);
671             /* add a term to the sum */
672             working_average[j] += rrd_values[k * row_length + j];
673             queue_push(buffers[j], rrd_values[k * row_length + j]);
674
675             /* reset k to be the center of the window */
676             k = MyMod(i - offset, row_count);
677             /* overwrite rdd_values entry, the old value is already
678              * saved in buffers */
679             rrd_values[k * row_length + j] =
680                 working_average[j] / (2 * offset + 1);
681             baseline[j] += rrd_values[k * row_length + j];
682
683             /* remove a term from the sum */
684             working_average[j] -= queue_pop(buffers[j]);
685         }
686     }
687
688     for (i = 0; i < row_length; ++i) {
689         queue_dealloc(buffers[i]);
690         baseline[i] /= row_count;
691     }
692     free(buffers);
693     free(working_average);
694
695     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
696         for (j = 0; j < row_length; ++j) {
697             for (i = 0; i < row_count; ++i) {
698                 rrd_values[i * row_length + j] -= baseline[j];
699             }
700             /* update the baseline coefficient,
701              * first, compute the cdp_index. */
702             offset = (rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt)
703                 * row_length + j;
704             (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
705                 baseline[j];
706         }
707         /* flush cdp to disk */
708         rrd_flush(rrd_file);
709         if (rrd_seek(rrd_file, sizeof(stat_head_t) +
710                      rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
711                      rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
712                      sizeof(live_head_t) +
713                      rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
714             rrd_set_error("apply_smoother: seek to cdp_prep failed");
715             free(rrd_values);
716             return -1;
717         }
718         if (rrd_write(rrd_file, rrd->cdp_prep,
719                       sizeof(cdp_prep_t) *
720                       (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
721             != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
722                           (rrd->stat_head->ds_cnt))) {
723             rrd_set_error("apply_smoother: cdp_prep write failed");
724             free(rrd_values);
725             return -1;
726         }
727     }
728
729     /* endif CF_SEASONAL */
730     /* flush updated values to disk */
731     rrd_flush(rrd_file);
732     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
733         rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
734         free(rrd_values);
735         return -1;
736     }
737     /* write as a single block */
738     if (rrd_write
739         (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
740         != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
741         rrd_set_error("apply_smoother: write failed to %lu", rra_start);
742         free(rrd_values);
743         return -1;
744     }
745
746     rrd_flush(rrd_file);
747     free(rrd_values);
748     free(baseline);
749     return 0;
750 }
751
752 /* Reset aberrant behavior model coefficients, including intercept, slope,
753  * seasonal, and seasonal deviation for the specified data source. */
754 void reset_aberrant_coefficients(
755     rrd_t *rrd,
756     rrd_file_t *rrd_file,
757     unsigned long ds_idx)
758 {
759     unsigned long cdp_idx, rra_idx, i;
760     unsigned long cdp_start, rra_start;
761     rrd_value_t nan_buffer = DNAN;
762
763     /* compute the offset for the cdp area */
764     cdp_start = sizeof(stat_head_t) +
765         rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
766         rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
767         sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
768     /* compute the offset for the first rra */
769     rra_start = cdp_start +
770         (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
771         sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
772
773     /* loop over the RRAs */
774     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
775         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
776         switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
777         case CF_HWPREDICT:
778             init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
779             break;
780         case CF_SEASONAL:
781         case CF_DEVSEASONAL:
782             /* don't use init_seasonal because it will reset burn-in, which
783              * means different data sources will be calling for the smoother
784              * at different times. */
785             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
786             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
787             /* move to first entry of data source for this rra */
788             rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
789                      SEEK_SET);
790             /* entries for the same data source are not contiguous, 
791              * temporal entries are contiguous */
792             for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
793                 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
794                     != sizeof(rrd_value_t) * 1) {
795                     rrd_set_error
796                         ("reset_aberrant_coefficients: write failed data source %lu rra %s",
797                          ds_idx, rrd->rra_def[rra_idx].cf_nam);
798                     return;
799                 }
800                 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
801                          sizeof(rrd_value_t), SEEK_CUR);
802             }
803             break;
804         case CF_FAILURES:
805             erase_violations(rrd, cdp_idx, rra_idx);
806             break;
807         default:
808             break;
809         }
810         /* move offset to the next rra */
811         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
812             sizeof(rrd_value_t);
813     }
814     rrd_seek(rrd_file, cdp_start, SEEK_SET);
815     if (rrd_write(rrd_file, rrd->cdp_prep,
816                   sizeof(cdp_prep_t) *
817                   (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
818         != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
819                       (rrd->stat_head->ds_cnt))) {
820         rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
821     }
822 }
823
824 void init_hwpredict_cdp(
825     cdp_prep_t *cdp)
826 {
827     cdp->scratch[CDP_hw_intercept].u_val = DNAN;
828     cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
829     cdp->scratch[CDP_hw_slope].u_val = DNAN;
830     cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
831     cdp->scratch[CDP_null_count].u_cnt = 1;
832     cdp->scratch[CDP_last_null_count].u_cnt = 1;
833 }
834
835 void init_seasonal_cdp(
836     cdp_prep_t *cdp)
837 {
838     cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
839     cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
840     cdp->scratch[CDP_init_seasonal].u_cnt = 1;
841 }
842
843 int update_aberrant_CF(
844     rrd_t *rrd,
845     rrd_value_t pdp_val,
846     enum cf_en current_cf,
847     unsigned long cdp_idx,
848     unsigned long rra_idx,
849     unsigned long ds_idx,
850     unsigned short CDP_scratch_idx,
851     rrd_value_t *seasonal_coef)
852 {
853     rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
854     switch (current_cf) {
855     case CF_AVERAGE:
856     default:
857         return 0;
858     case CF_HWPREDICT:
859         return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
860                                 CDP_scratch_idx);
861     case CF_DEVPREDICT:
862         return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
863                                  CDP_scratch_idx);
864     case CF_SEASONAL:
865         return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx, CDP_scratch_idx,
866                                seasonal_coef);
867     case CF_DEVSEASONAL:
868         return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
869                                   CDP_scratch_idx, seasonal_coef);
870     case CF_FAILURES:
871         return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
872                                CDP_scratch_idx);
873     }
874     return -1;
875 }
876
877 unsigned long MyMod(
878     signed long val,
879     unsigned long mod)
880 {
881     unsigned long new_val;
882
883     if (val < 0)
884         new_val = ((unsigned long) abs(val)) % mod;
885     else
886         new_val = (val % mod);
887
888     if (val < 0)
889         return (mod - new_val);
890     else
891         return (new_val);
892 }
893
894 /* a standard fixed-capacity FIF0 queue implementation
895  * No overflow checking is performed. */
896 int queue_alloc(
897     FIFOqueue **q,
898     int capacity)
899 {
900     *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
901     if (*q == NULL)
902         return -1;
903     (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
904     if ((*q)->queue == NULL) {
905         free(*q);
906         return -1;
907     }
908     (*q)->capacity = capacity;
909     (*q)->head = capacity;
910     (*q)->tail = 0;
911     return 0;
912 }
913
914 int queue_isempty(
915     FIFOqueue *q)
916 {
917     return (q->head % q->capacity == q->tail);
918 }
919
920 void queue_push(
921     FIFOqueue *q,
922     rrd_value_t value)
923 {
924     q->queue[(q->tail)++] = value;
925     q->tail = q->tail % q->capacity;
926 }
927
928 rrd_value_t queue_pop(
929     FIFOqueue *q)
930 {
931     q->head = q->head % q->capacity;
932     return q->queue[(q->head)++];
933 }
934
935 void queue_dealloc(
936     FIFOqueue *q)
937 {
938     free(q->queue);
939     free(q);
940 }