fix from alex
[rrdtool.git] / src / rrd_hw.c
1 /*****************************************************************************
2  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
3  *****************************************************************************
4  * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5  *****************************************************************************
6  * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include "rrd_hw.h"
11 #include "rrd_hw_math.h"
12 #include "rrd_hw_update.h"
13
14 #define hw_dep_idx(rrd, rra_idx) rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt
15
16 /* #define DEBUG */
17
18 /* private functions */
19 static unsigned long MyMod(
20     signed long val,
21     unsigned long mod);
22
23 int lookup_seasonal(
24     rrd_t *rrd,
25     unsigned long rra_idx,
26     unsigned long rra_start,
27     rrd_file_t *rrd_file,
28     unsigned long offset,
29     rrd_value_t **seasonal_coef)
30 {
31     unsigned long pos_tmp;
32
33     /* rra_ptr[].cur_row points to the rra row to be written; this function
34      * reads cur_row + offset */
35     unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
36
37     /* handle wrap around */
38     if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
39         row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
40
41     /* rra_start points to the appropriate rra block in the file */
42     /* compute the pointer to the appropriate location in the file */
43     pos_tmp =
44         rra_start +
45         (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
46
47     /* allocate memory if need be */
48     if (*seasonal_coef == NULL)
49         *seasonal_coef =
50             (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
51                                    sizeof(rrd_value_t));
52     if (*seasonal_coef == NULL) {
53         rrd_set_error("memory allocation failure: seasonal coef");
54         return -1;
55     }
56
57     if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
58         if (rrd_read
59             (rrd_file, *seasonal_coef,
60              sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
61             == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
62             /* success! */
63             /* we can safely ignore the rule requiring a seek operation between read
64              * and write, because this read moves the file pointer to somewhere
65              * in the file other than the next write location.
66              * */
67             return 0;
68         } else {
69             rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
70                           pos_tmp);
71         }
72     } else {
73         rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
74                       pos_tmp);
75     }
76
77     return -1;
78 }
79
80 /* For the specified CDP prep area and the FAILURES RRA,
81  * erase all history of past violations.
82  */
83 void erase_violations(
84     rrd_t *rrd,
85     unsigned long cdp_idx,
86     unsigned long rra_idx)
87 {
88     unsigned short i;
89     char     *violations_array;
90
91     /* check that rra_idx is a CF_FAILURES array */
92     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
93 #ifdef DEBUG
94         fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
95                 rrd->rra_def[rra_idx].cf_nam);
96 #endif
97         return;
98     }
99 #ifdef DEBUG
100     fprintf(stderr, "scratch buffer before erase:\n");
101     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
102         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
103     }
104     fprintf(stderr, "\n");
105 #endif
106
107     /* WARNING: an array of longs on disk is treated as an array of chars
108      * in memory. */
109     violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
110     /* erase everything in the part of the CDP scratch array that will be
111      * used to store violations for the current window */
112     for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
113         violations_array[i - 1] = 0;
114     }
115 #ifdef DEBUG
116     fprintf(stderr, "scratch buffer after erase:\n");
117     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
118         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
119     }
120     fprintf(stderr, "\n");
121 #endif
122 }
123
124 /* Smooth a periodic array with a moving average: equal weights and
125  * length = 5% of the period. */
126 int apply_smoother(
127     rrd_t *rrd,
128     unsigned long rra_idx,
129     unsigned long rra_start,
130     rrd_file_t *rrd_file)
131 {
132     unsigned long i, j, k;
133     unsigned long totalbytes;
134     rrd_value_t *rrd_values;
135     unsigned long row_length = rrd->stat_head->ds_cnt;
136     unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
137     unsigned long offset;
138     FIFOqueue **buffers;
139     rrd_value_t *working_average;
140     rrd_value_t *baseline;
141
142     if (atoi(rrd->stat_head->version) >= 4) {
143         offset = floor(rrd->rra_def[rra_idx].
144                        par[RRA_seasonal_smoothing_window].
145                        u_val / 2 * row_count);
146     } else {
147         offset = floor(0.05 / 2 * row_count);
148     }
149
150     if (offset == 0)
151         return 0;       /* no smoothing */
152
153     /* allocate memory */
154     totalbytes = sizeof(rrd_value_t) * row_length * row_count;
155     rrd_values = (rrd_value_t *) malloc(totalbytes);
156     if (rrd_values == NULL) {
157         rrd_set_error("apply smoother: memory allocation failure");
158         return -1;
159     }
160
161     /* rra_start is at the beginning of this rra */
162     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
163         rrd_set_error("seek to rra %d failed", rra_start);
164         free(rrd_values);
165         return -1;
166     }
167     rrd_flush(rrd_file);
168     /* could read all data in a single block, but we need to
169      * check for NA values */
170     for (i = 0; i < row_count; ++i) {
171         for (j = 0; j < row_length; ++j) {
172             if (rrd_read
173                 (rrd_file, &(rrd_values[i * row_length + j]),
174                  sizeof(rrd_value_t) * 1)
175                 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
176                 rrd_set_error("reading value failed: %s",
177                               rrd_strerror(errno));
178             }
179             if (isnan(rrd_values[i * row_length + j])) {
180                 /* can't apply smoothing, still uninitialized values */
181 #ifdef DEBUG
182                 fprintf(stderr,
183                         "apply_smoother: NA detected in seasonal array: %ld %ld\n",
184                         i, j);
185 #endif
186                 free(rrd_values);
187                 return 0;
188             }
189         }
190     }
191
192     /* allocate queues, one for each data source */
193     buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
194     for (i = 0; i < row_length; ++i) {
195         queue_alloc(&(buffers[i]), 2 * offset + 1);
196     }
197     /* need working average initialized to 0 */
198     working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
199     baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
200
201     /* compute sums of the first 2*offset terms */
202     for (i = 0; i < 2 * offset; ++i) {
203         k = MyMod(i - offset, row_count);
204         for (j = 0; j < row_length; ++j) {
205             queue_push(buffers[j], rrd_values[k * row_length + j]);
206             working_average[j] += rrd_values[k * row_length + j];
207         }
208     }
209
210     /* compute moving averages */
211     for (i = offset; i < row_count + offset; ++i) {
212         for (j = 0; j < row_length; ++j) {
213             k = MyMod(i, row_count);
214             /* add a term to the sum */
215             working_average[j] += rrd_values[k * row_length + j];
216             queue_push(buffers[j], rrd_values[k * row_length + j]);
217
218             /* reset k to be the center of the window */
219             k = MyMod(i - offset, row_count);
220             /* overwrite rdd_values entry, the old value is already
221              * saved in buffers */
222             rrd_values[k * row_length + j] =
223                 working_average[j] / (2 * offset + 1);
224             baseline[j] += rrd_values[k * row_length + j];
225
226             /* remove a term from the sum */
227             working_average[j] -= queue_pop(buffers[j]);
228         }
229     }
230
231     for (i = 0; i < row_length; ++i) {
232         queue_dealloc(buffers[i]);
233         baseline[i] /= row_count;
234     }
235     free(buffers);
236     free(working_average);
237
238     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
239         rrd_value_t (
240     *init_seasonality) (
241     rrd_value_t seasonal_coef,
242     rrd_value_t intercept);
243
244         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
245         case CF_HWPREDICT:
246             init_seasonality = hw_additive_init_seasonality;
247             break;
248         case CF_MHWPREDICT:
249             init_seasonality = hw_multiplicative_init_seasonality;
250             break;
251         default:
252             rrd_set_error("apply smoother: SEASONAL rra doesn't have "
253                           "valid dependency: %s",
254                           rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam);
255             return -1;
256         }
257
258         for (j = 0; j < row_length; ++j) {
259             for (i = 0; i < row_count; ++i) {
260                 rrd_values[i * row_length + j] =
261                     init_seasonality(rrd_values[i * row_length + j],
262                                      baseline[j]);
263             }
264             /* update the baseline coefficient,
265              * first, compute the cdp_index. */
266             offset = hw_dep_idx(rrd, rra_idx) * row_length + j;
267             (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
268                 baseline[j];
269         }
270         /* flush cdp to disk */
271         rrd_flush(rrd_file);
272         if (rrd_seek(rrd_file, sizeof(stat_head_t) +
273                      rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
274                      rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
275                      sizeof(live_head_t) +
276                      rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
277             rrd_set_error("apply_smoother: seek to cdp_prep failed");
278             free(rrd_values);
279             return -1;
280         }
281         if (rrd_write(rrd_file, rrd->cdp_prep,
282                       sizeof(cdp_prep_t) *
283                       (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
284             != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
285                           (rrd->stat_head->ds_cnt))) {
286             rrd_set_error("apply_smoother: cdp_prep write failed");
287             free(rrd_values);
288             return -1;
289         }
290     }
291
292     /* endif CF_SEASONAL */
293     /* flush updated values to disk */
294     rrd_flush(rrd_file);
295     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
296         rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
297         free(rrd_values);
298         return -1;
299     }
300     /* write as a single block */
301     if (rrd_write
302         (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
303         != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
304         rrd_set_error("apply_smoother: write failed to %lu", rra_start);
305         free(rrd_values);
306         return -1;
307     }
308
309     rrd_flush(rrd_file);
310     free(rrd_values);
311     free(baseline);
312     return 0;
313 }
314
315 /* Reset aberrant behavior model coefficients, including intercept, slope,
316  * seasonal, and seasonal deviation for the specified data source. */
317 void reset_aberrant_coefficients(
318     rrd_t *rrd,
319     rrd_file_t *rrd_file,
320     unsigned long ds_idx)
321 {
322     unsigned long cdp_idx, rra_idx, i;
323     unsigned long cdp_start, rra_start;
324     rrd_value_t nan_buffer = DNAN;
325
326     /* compute the offset for the cdp area */
327     cdp_start = sizeof(stat_head_t) +
328         rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
329         rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
330         sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
331     /* compute the offset for the first rra */
332     rra_start = cdp_start +
333         (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
334         sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
335
336     /* loop over the RRAs */
337     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
338         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
339         switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
340         case CF_HWPREDICT:
341         case CF_MHWPREDICT:
342             init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
343             break;
344         case CF_SEASONAL:
345         case CF_DEVSEASONAL:
346             /* don't use init_seasonal because it will reset burn-in, which
347              * means different data sources will be calling for the smoother
348              * at different times. */
349             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
350             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
351             /* move to first entry of data source for this rra */
352             rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
353                      SEEK_SET);
354             /* entries for the same data source are not contiguous, 
355              * temporal entries are contiguous */
356             for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
357                 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
358                     != sizeof(rrd_value_t) * 1) {
359                     rrd_set_error
360                         ("reset_aberrant_coefficients: write failed data source %lu rra %s",
361                          ds_idx, rrd->rra_def[rra_idx].cf_nam);
362                     return;
363                 }
364                 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
365                          sizeof(rrd_value_t), SEEK_CUR);
366             }
367             break;
368         case CF_FAILURES:
369             erase_violations(rrd, cdp_idx, rra_idx);
370             break;
371         default:
372             break;
373         }
374         /* move offset to the next rra */
375         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
376             sizeof(rrd_value_t);
377     }
378     rrd_seek(rrd_file, cdp_start, SEEK_SET);
379     if (rrd_write(rrd_file, rrd->cdp_prep,
380                   sizeof(cdp_prep_t) *
381                   (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
382         != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
383                       (rrd->stat_head->ds_cnt))) {
384         rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
385     }
386 }
387
388 void init_hwpredict_cdp(
389     cdp_prep_t *cdp)
390 {
391     cdp->scratch[CDP_hw_intercept].u_val = DNAN;
392     cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
393     cdp->scratch[CDP_hw_slope].u_val = DNAN;
394     cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
395     cdp->scratch[CDP_null_count].u_cnt = 1;
396     cdp->scratch[CDP_last_null_count].u_cnt = 1;
397 }
398
399 void init_seasonal_cdp(
400     cdp_prep_t *cdp)
401 {
402     cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
403     cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
404     cdp->scratch[CDP_init_seasonal].u_cnt = 1;
405 }
406
407 int update_aberrant_CF(
408     rrd_t *rrd,
409     rrd_value_t pdp_val,
410     enum cf_en current_cf,
411     unsigned long cdp_idx,
412     unsigned long rra_idx,
413     unsigned long ds_idx,
414     unsigned short CDP_scratch_idx,
415     rrd_value_t *seasonal_coef)
416 {
417     static hw_functions_t hw_multiplicative_functions = {
418         hw_multiplicative_calculate_prediction,
419         hw_multiplicative_calculate_intercept,
420         hw_calculate_slope,
421         hw_multiplicative_calculate_seasonality,
422         hw_multiplicative_init_seasonality,
423         hw_calculate_seasonal_deviation,
424         hw_init_seasonal_deviation,
425         1.0             /* identity value */
426     };
427
428     static hw_functions_t hw_additive_functions = {
429         hw_additive_calculate_prediction,
430         hw_additive_calculate_intercept,
431         hw_calculate_slope,
432         hw_additive_calculate_seasonality,
433         hw_additive_init_seasonality,
434         hw_calculate_seasonal_deviation,
435         hw_init_seasonal_deviation,
436         0.0             /* identity value  */
437     };
438
439     rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
440     switch (current_cf) {
441     case CF_HWPREDICT:
442         return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
443                                 CDP_scratch_idx, &hw_additive_functions);
444     case CF_MHWPREDICT:
445         return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
446                                 CDP_scratch_idx,
447                                 &hw_multiplicative_functions);
448     case CF_DEVPREDICT:
449         return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
450                                  CDP_scratch_idx);
451     case CF_SEASONAL:
452         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
453         case CF_HWPREDICT:
454             return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
455                                    CDP_scratch_idx, seasonal_coef,
456                                    &hw_additive_functions);
457         case CF_MHWPREDICT:
458             return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
459                                    CDP_scratch_idx, seasonal_coef,
460                                    &hw_multiplicative_functions);
461         default:
462             return -1;
463         }
464     case CF_DEVSEASONAL:
465         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
466         case CF_HWPREDICT:
467             return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
468                                       CDP_scratch_idx, seasonal_coef,
469                                       &hw_additive_functions);
470         case CF_MHWPREDICT:
471             return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
472                                       CDP_scratch_idx, seasonal_coef,
473                                       &hw_multiplicative_functions);
474         default:
475             return -1;
476         }
477     case CF_FAILURES:
478         switch (cf_conv
479                 (rrd->rra_def[hw_dep_idx(rrd, hw_dep_idx(rrd, rra_idx))].
480                  cf_nam)) {
481         case CF_HWPREDICT:
482             return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
483                                    CDP_scratch_idx, &hw_additive_functions);
484         case CF_MHWPREDICT:
485             return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
486                                    CDP_scratch_idx,
487                                    &hw_multiplicative_functions);
488         default:
489             return -1;
490         }
491     case CF_AVERAGE:
492     default:
493         return 0;
494     }
495     return -1;
496 }
497
498 static unsigned long MyMod(
499     signed long val,
500     unsigned long mod)
501 {
502     unsigned long new_val;
503
504     if (val < 0)
505         new_val = ((unsigned long) abs(val)) % mod;
506     else
507         new_val = (val % mod);
508
509     if (val < 0)
510         return (mod - new_val);
511     else
512         return (new_val);
513 }
514
515 /* a standard fixed-capacity FIF0 queue implementation
516  * No overflow checking is performed. */
517 int queue_alloc(
518     FIFOqueue **q,
519     int capacity)
520 {
521     *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
522     if (*q == NULL)
523         return -1;
524     (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
525     if ((*q)->queue == NULL) {
526         free(*q);
527         return -1;
528     }
529     (*q)->capacity = capacity;
530     (*q)->head = capacity;
531     (*q)->tail = 0;
532     return 0;
533 }
534
535 int queue_isempty(
536     FIFOqueue *q)
537 {
538     return (q->head % q->capacity == q->tail);
539 }
540
541 void queue_push(
542     FIFOqueue *q,
543     rrd_value_t value)
544 {
545     q->queue[(q->tail)++] = value;
546     q->tail = q->tail % q->capacity;
547 }
548
549 rrd_value_t queue_pop(
550     FIFOqueue *q)
551 {
552     q->head = q->head % q->capacity;
553     return q->queue[(q->head)++];
554 }
555
556 void queue_dealloc(
557     FIFOqueue *q)
558 {
559     free(q->queue);
560     free(q);
561 }