There are two popular variants of the Holt-Winters forecasting method; RRDtool
[rrdtool.git] / src / rrd_hw.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5  *****************************************************************************
6  * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include "rrd_hw.h"
11 #include "rrd_hw_math.h"
12 #include "rrd_hw_update.h"
13
14 #define hw_dep_idx(rrd, rra_idx) rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt
15
16 /* #define DEBUG */
17
18 /* private functions */
19 static unsigned long MyMod(
20     signed long val,
21     unsigned long mod);
22
23 int lookup_seasonal(
24     rrd_t *rrd,
25     unsigned long rra_idx,
26     unsigned long rra_start,
27     rrd_file_t *rrd_file,
28     unsigned long offset,
29     rrd_value_t **seasonal_coef)
30 {
31     unsigned long pos_tmp;
32
33     /* rra_ptr[].cur_row points to the rra row to be written; this function
34      * reads cur_row + offset */
35     unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
36
37     /* handle wrap around */
38     if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
39         row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
40
41     /* rra_start points to the appropriate rra block in the file */
42     /* compute the pointer to the appropriate location in the file */
43     pos_tmp =
44         rra_start +
45         (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
46
47     /* allocate memory if need be */
48     if (*seasonal_coef == NULL)
49         *seasonal_coef =
50             (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
51                                    sizeof(rrd_value_t));
52     if (*seasonal_coef == NULL) {
53         rrd_set_error("memory allocation failure: seasonal coef");
54         return -1;
55     }
56
57     if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
58         if (rrd_read
59             (rrd_file, *seasonal_coef,
60              sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
61             == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
62             /* success! */
63             /* we can safely ignore the rule requiring a seek operation between read
64              * and write, because this read moves the file pointer to somewhere
65              * in the file other than the next write location.
66              * */
67             return 0;
68         } else {
69             rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
70                           pos_tmp);
71         }
72     } else {
73         rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
74                       pos_tmp);
75     }
76
77     return -1;
78 }
79
80 /* For the specified CDP prep area and the FAILURES RRA,
81  * erase all history of past violations.
82  */
83 void erase_violations(
84     rrd_t *rrd,
85     unsigned long cdp_idx,
86     unsigned long rra_idx)
87 {
88     unsigned short i;
89     char     *violations_array;
90
91     /* check that rra_idx is a CF_FAILURES array */
92     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
93 #ifdef DEBUG
94         fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
95                 rrd->rra_def[rra_idx].cf_nam);
96 #endif
97         return;
98     }
99 #ifdef DEBUG
100     fprintf(stderr, "scratch buffer before erase:\n");
101     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
102         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
103     }
104     fprintf(stderr, "\n");
105 #endif
106
107     /* WARNING: an array of longs on disk is treated as an array of chars
108      * in memory. */
109     violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
110     /* erase everything in the part of the CDP scratch array that will be
111      * used to store violations for the current window */
112     for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
113         violations_array[i - 1] = 0;
114     }
115 #ifdef DEBUG
116     fprintf(stderr, "scratch buffer after erase:\n");
117     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
118         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
119     }
120     fprintf(stderr, "\n");
121 #endif
122 }
123
124 /* Smooth a periodic array with a moving average: equal weights and
125  * length = 5% of the period. */
126 int apply_smoother(
127     rrd_t *rrd,
128     unsigned long rra_idx,
129     unsigned long rra_start,
130     rrd_file_t *rrd_file)
131 {
132     unsigned long i, j, k;
133     unsigned long totalbytes;
134     rrd_value_t *rrd_values;
135     unsigned long row_length = rrd->stat_head->ds_cnt;
136     unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
137     unsigned long offset;
138     FIFOqueue **buffers;
139     rrd_value_t *working_average;
140     rrd_value_t *baseline;
141
142     offset = floor(0.025 * row_count);
143     if (offset == 0)
144         return 0;       /* no smoothing */
145
146     /* allocate memory */
147     totalbytes = sizeof(rrd_value_t) * row_length * row_count;
148     rrd_values = (rrd_value_t *) malloc(totalbytes);
149     if (rrd_values == NULL) {
150         rrd_set_error("apply smoother: memory allocation failure");
151         return -1;
152     }
153
154     /* rra_start is at the beginning of this rra */
155     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
156         rrd_set_error("seek to rra %d failed", rra_start);
157         free(rrd_values);
158         return -1;
159     }
160     rrd_flush(rrd_file);
161     /* could read all data in a single block, but we need to
162      * check for NA values */
163     for (i = 0; i < row_count; ++i) {
164         for (j = 0; j < row_length; ++j) {
165             if (rrd_read
166                 (rrd_file, &(rrd_values[i * row_length + j]),
167                  sizeof(rrd_value_t) * 1)
168                 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
169                 rrd_set_error("reading value failed: %s",
170                               rrd_strerror(errno));
171             }
172             if (isnan(rrd_values[i * row_length + j])) {
173                 /* can't apply smoothing, still uninitialized values */
174 #ifdef DEBUG
175                 fprintf(stderr,
176                         "apply_smoother: NA detected in seasonal array: %ld %ld\n",
177                         i, j);
178 #endif
179                 free(rrd_values);
180                 return 0;
181             }
182         }
183     }
184
185     /* allocate queues, one for each data source */
186     buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
187     for (i = 0; i < row_length; ++i) {
188         queue_alloc(&(buffers[i]), 2 * offset + 1);
189     }
190     /* need working average initialized to 0 */
191     working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
192     baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
193
194     /* compute sums of the first 2*offset terms */
195     for (i = 0; i < 2 * offset; ++i) {
196         k = MyMod(i - offset, row_count);
197         for (j = 0; j < row_length; ++j) {
198             queue_push(buffers[j], rrd_values[k * row_length + j]);
199             working_average[j] += rrd_values[k * row_length + j];
200         }
201     }
202
203     /* compute moving averages */
204     for (i = offset; i < row_count + offset; ++i) {
205         for (j = 0; j < row_length; ++j) {
206             k = MyMod(i, row_count);
207             /* add a term to the sum */
208             working_average[j] += rrd_values[k * row_length + j];
209             queue_push(buffers[j], rrd_values[k * row_length + j]);
210
211             /* reset k to be the center of the window */
212             k = MyMod(i - offset, row_count);
213             /* overwrite rdd_values entry, the old value is already
214              * saved in buffers */
215             rrd_values[k * row_length + j] =
216                 working_average[j] / (2 * offset + 1);
217             baseline[j] += rrd_values[k * row_length + j];
218
219             /* remove a term from the sum */
220             working_average[j] -= queue_pop(buffers[j]);
221         }
222     }
223
224     for (i = 0; i < row_length; ++i) {
225         queue_dealloc(buffers[i]);
226         baseline[i] /= row_count;
227     }
228     free(buffers);
229     free(working_average);
230
231     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
232         rrd_value_t (
233     *init_seasonality) (
234     rrd_value_t seasonal_coef,
235     rrd_value_t intercept);
236
237         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
238         case CF_HWPREDICT:
239             init_seasonality = hw_additive_init_seasonality;
240             break;
241         case CF_MHWPREDICT:
242             init_seasonality = hw_multiplicative_init_seasonality;
243             break;
244         default:
245             rrd_set_error("apply smoother: SEASONAL rra doesn't have "
246                           "valid dependency: %s",
247                           rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam);
248             return -1;
249         }
250
251         for (j = 0; j < row_length; ++j) {
252             for (i = 0; i < row_count; ++i) {
253                 rrd_values[i * row_length + j] =
254                     init_seasonality(rrd_values[i * row_length + j],
255                                      baseline[j]);
256             }
257             /* update the baseline coefficient,
258              * first, compute the cdp_index. */
259             offset = hw_dep_idx(rrd, rra_idx) * row_length + j;
260             (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
261                 baseline[j];
262         }
263         /* flush cdp to disk */
264         rrd_flush(rrd_file);
265         if (rrd_seek(rrd_file, sizeof(stat_head_t) +
266                      rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
267                      rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
268                      sizeof(live_head_t) +
269                      rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
270             rrd_set_error("apply_smoother: seek to cdp_prep failed");
271             free(rrd_values);
272             return -1;
273         }
274         if (rrd_write(rrd_file, rrd->cdp_prep,
275                       sizeof(cdp_prep_t) *
276                       (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
277             != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
278                           (rrd->stat_head->ds_cnt))) {
279             rrd_set_error("apply_smoother: cdp_prep write failed");
280             free(rrd_values);
281             return -1;
282         }
283     }
284
285     /* endif CF_SEASONAL */
286     /* flush updated values to disk */
287     rrd_flush(rrd_file);
288     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
289         rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
290         free(rrd_values);
291         return -1;
292     }
293     /* write as a single block */
294     if (rrd_write
295         (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
296         != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
297         rrd_set_error("apply_smoother: write failed to %lu", rra_start);
298         free(rrd_values);
299         return -1;
300     }
301
302     rrd_flush(rrd_file);
303     free(rrd_values);
304     free(baseline);
305     return 0;
306 }
307
308 /* Reset aberrant behavior model coefficients, including intercept, slope,
309  * seasonal, and seasonal deviation for the specified data source. */
310 void reset_aberrant_coefficients(
311     rrd_t *rrd,
312     rrd_file_t *rrd_file,
313     unsigned long ds_idx)
314 {
315     unsigned long cdp_idx, rra_idx, i;
316     unsigned long cdp_start, rra_start;
317     rrd_value_t nan_buffer = DNAN;
318
319     /* compute the offset for the cdp area */
320     cdp_start = sizeof(stat_head_t) +
321         rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
322         rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
323         sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
324     /* compute the offset for the first rra */
325     rra_start = cdp_start +
326         (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
327         sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
328
329     /* loop over the RRAs */
330     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
331         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
332         switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
333         case CF_HWPREDICT:
334         case CF_MHWPREDICT:
335             init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
336             break;
337         case CF_SEASONAL:
338         case CF_DEVSEASONAL:
339             /* don't use init_seasonal because it will reset burn-in, which
340              * means different data sources will be calling for the smoother
341              * at different times. */
342             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
343             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
344             /* move to first entry of data source for this rra */
345             rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
346                      SEEK_SET);
347             /* entries for the same data source are not contiguous, 
348              * temporal entries are contiguous */
349             for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
350                 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
351                     != sizeof(rrd_value_t) * 1) {
352                     rrd_set_error
353                         ("reset_aberrant_coefficients: write failed data source %lu rra %s",
354                          ds_idx, rrd->rra_def[rra_idx].cf_nam);
355                     return;
356                 }
357                 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
358                          sizeof(rrd_value_t), SEEK_CUR);
359             }
360             break;
361         case CF_FAILURES:
362             erase_violations(rrd, cdp_idx, rra_idx);
363             break;
364         default:
365             break;
366         }
367         /* move offset to the next rra */
368         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
369             sizeof(rrd_value_t);
370     }
371     rrd_seek(rrd_file, cdp_start, SEEK_SET);
372     if (rrd_write(rrd_file, rrd->cdp_prep,
373                   sizeof(cdp_prep_t) *
374                   (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
375         != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
376                       (rrd->stat_head->ds_cnt))) {
377         rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
378     }
379 }
380
381 void init_hwpredict_cdp(
382     cdp_prep_t *cdp)
383 {
384     cdp->scratch[CDP_hw_intercept].u_val = DNAN;
385     cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
386     cdp->scratch[CDP_hw_slope].u_val = DNAN;
387     cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
388     cdp->scratch[CDP_null_count].u_cnt = 1;
389     cdp->scratch[CDP_last_null_count].u_cnt = 1;
390 }
391
392 void init_seasonal_cdp(
393     cdp_prep_t *cdp)
394 {
395     cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
396     cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
397     cdp->scratch[CDP_init_seasonal].u_cnt = 1;
398 }
399
400 int update_aberrant_CF(
401     rrd_t *rrd,
402     rrd_value_t pdp_val,
403     enum cf_en current_cf,
404     unsigned long cdp_idx,
405     unsigned long rra_idx,
406     unsigned long ds_idx,
407     unsigned short CDP_scratch_idx,
408     rrd_value_t *seasonal_coef)
409 {
410     static hw_functions_t hw_multiplicative_functions = {
411         hw_multiplicative_calculate_prediction,
412         hw_multiplicative_calculate_intercept,
413         hw_calculate_slope,
414         hw_multiplicative_calculate_seasonality,
415         hw_multiplicative_init_seasonality,
416         hw_calculate_seasonal_deviation,
417         hw_init_seasonal_deviation,
418         1.0             // identity value
419     };
420
421     static hw_functions_t hw_additive_functions = {
422         hw_additive_calculate_prediction,
423         hw_additive_calculate_intercept,
424         hw_calculate_slope,
425         hw_additive_calculate_seasonality,
426         hw_additive_init_seasonality,
427         hw_calculate_seasonal_deviation,
428         hw_init_seasonal_deviation,
429         0.0             // identity value 
430     };
431
432     rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
433     switch (current_cf) {
434     case CF_HWPREDICT:
435         return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
436                                 CDP_scratch_idx, &hw_additive_functions);
437     case CF_MHWPREDICT:
438         return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
439                                 CDP_scratch_idx,
440                                 &hw_multiplicative_functions);
441     case CF_DEVPREDICT:
442         return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
443                                  CDP_scratch_idx);
444     case CF_SEASONAL:
445         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
446         case CF_HWPREDICT:
447             return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
448                                    CDP_scratch_idx, seasonal_coef,
449                                    &hw_additive_functions);
450         case CF_MHWPREDICT:
451             return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
452                                    CDP_scratch_idx, seasonal_coef,
453                                    &hw_multiplicative_functions);
454         default:
455             return -1;
456         }
457     case CF_DEVSEASONAL:
458         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
459         case CF_HWPREDICT:
460             return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
461                                       CDP_scratch_idx, seasonal_coef,
462                                       &hw_additive_functions);
463         case CF_MHWPREDICT:
464             return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
465                                       CDP_scratch_idx, seasonal_coef,
466                                       &hw_multiplicative_functions);
467         default:
468             return -1;
469         }
470     case CF_FAILURES:
471         switch (cf_conv
472                 (rrd->rra_def[hw_dep_idx(rrd, hw_dep_idx(rrd, rra_idx))].
473                  cf_nam)) {
474         case CF_HWPREDICT:
475             return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
476                                    CDP_scratch_idx, &hw_additive_functions);
477         case CF_MHWPREDICT:
478             return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
479                                    CDP_scratch_idx,
480                                    &hw_multiplicative_functions);
481         default:
482             return -1;
483         }
484     case CF_AVERAGE:
485     default:
486         return 0;
487     }
488     return -1;
489 }
490
491 static unsigned long MyMod(
492     signed long val,
493     unsigned long mod)
494 {
495     unsigned long new_val;
496
497     if (val < 0)
498         new_val = ((unsigned long) abs(val)) % mod;
499     else
500         new_val = (val % mod);
501
502     if (val < 0)
503         return (mod - new_val);
504     else
505         return (new_val);
506 }
507
508 /* a standard fixed-capacity FIF0 queue implementation
509  * No overflow checking is performed. */
510 int queue_alloc(
511     FIFOqueue **q,
512     int capacity)
513 {
514     *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
515     if (*q == NULL)
516         return -1;
517     (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
518     if ((*q)->queue == NULL) {
519         free(*q);
520         return -1;
521     }
522     (*q)->capacity = capacity;
523     (*q)->head = capacity;
524     (*q)->tail = 0;
525     return 0;
526 }
527
528 int queue_isempty(
529     FIFOqueue *q)
530 {
531     return (q->head % q->capacity == q->tail);
532 }
533
534 void queue_push(
535     FIFOqueue *q,
536     rrd_value_t value)
537 {
538     q->queue[(q->tail)++] = value;
539     q->tail = q->tail % q->capacity;
540 }
541
542 rrd_value_t queue_pop(
543     FIFOqueue *q)
544 {
545     q->head = q->head % q->capacity;
546     return q->queue[(q->head)++];
547 }
548
549 void queue_dealloc(
550     FIFOqueue *q)
551 {
552     free(q->queue);
553     free(q);
554 }