Attached a patch for the rrd-tool LIBDBI integration with the following improvements:
[rrdtool.git] / src / rrd_hw.c
1 /*****************************************************************************
2  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
3  *****************************************************************************
4  * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5  *****************************************************************************
6  * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7  *****************************************************************************/
8
9 #include <stdlib.h>
10
11 #include "rrd_tool.h"
12 #include "rrd_hw.h"
13 #include "rrd_hw_math.h"
14 #include "rrd_hw_update.h"
15
16 #define hw_dep_idx(rrd, rra_idx) rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt
17
18 /* #define DEBUG */
19
20 /* private functions */
21 static unsigned long MyMod(
22     signed long val,
23     unsigned long mod);
24
25 int lookup_seasonal(
26     rrd_t *rrd,
27     unsigned long rra_idx,
28     unsigned long rra_start,
29     rrd_file_t *rrd_file,
30     unsigned long offset,
31     rrd_value_t **seasonal_coef)
32 {
33     unsigned long pos_tmp;
34
35     /* rra_ptr[].cur_row points to the rra row to be written; this function
36      * reads cur_row + offset */
37     unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
38
39     /* handle wrap around */
40     if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
41         row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
42
43     /* rra_start points to the appropriate rra block in the file */
44     /* compute the pointer to the appropriate location in the file */
45     pos_tmp =
46         rra_start +
47         (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
48
49     /* allocate memory if need be */
50     if (*seasonal_coef == NULL)
51         *seasonal_coef =
52             (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
53                                    sizeof(rrd_value_t));
54     if (*seasonal_coef == NULL) {
55         rrd_set_error("memory allocation failure: seasonal coef");
56         return -1;
57     }
58
59     if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
60         if (rrd_read
61             (rrd_file, *seasonal_coef,
62              sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
63             == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
64             /* success! */
65             /* we can safely ignore the rule requiring a seek operation between read
66              * and write, because this read moves the file pointer to somewhere
67              * in the file other than the next write location.
68              * */
69             return 0;
70         } else {
71             rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
72                           pos_tmp);
73         }
74     } else {
75         rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
76                       pos_tmp);
77     }
78
79     return -1;
80 }
81
82 /* For the specified CDP prep area and the FAILURES RRA,
83  * erase all history of past violations.
84  */
85 void erase_violations(
86     rrd_t *rrd,
87     unsigned long cdp_idx,
88     unsigned long rra_idx)
89 {
90     unsigned short i;
91     char     *violations_array;
92
93     /* check that rra_idx is a CF_FAILURES array */
94     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
95 #ifdef DEBUG
96         fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
97                 rrd->rra_def[rra_idx].cf_nam);
98 #endif
99         return;
100     }
101 #ifdef DEBUG
102     fprintf(stderr, "scratch buffer before erase:\n");
103     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
104         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
105     }
106     fprintf(stderr, "\n");
107 #endif
108
109     /* WARNING: an array of longs on disk is treated as an array of chars
110      * in memory. */
111     violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
112     /* erase everything in the part of the CDP scratch array that will be
113      * used to store violations for the current window */
114     for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
115         violations_array[i - 1] = 0;
116     }
117 #ifdef DEBUG
118     fprintf(stderr, "scratch buffer after erase:\n");
119     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
120         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
121     }
122     fprintf(stderr, "\n");
123 #endif
124 }
125
126 /* Smooth a periodic array with a moving average: equal weights and
127  * length = 5% of the period. */
128 int apply_smoother(
129     rrd_t *rrd,
130     unsigned long rra_idx,
131     unsigned long rra_start,
132     rrd_file_t *rrd_file)
133 {
134     unsigned long i, j, k;
135     unsigned long totalbytes;
136     rrd_value_t *rrd_values;
137     unsigned long row_length = rrd->stat_head->ds_cnt;
138     unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
139     unsigned long offset;
140     FIFOqueue **buffers;
141     rrd_value_t *working_average;
142     rrd_value_t *baseline;
143
144     if (atoi(rrd->stat_head->version) >= 4) {
145         offset = floor(rrd->rra_def[rra_idx].
146                        par[RRA_seasonal_smoothing_window].
147                        u_val / 2 * row_count);
148     } else {
149         offset = floor(0.05 / 2 * row_count);
150     }
151
152     if (offset == 0)
153         return 0;       /* no smoothing */
154
155     /* allocate memory */
156     totalbytes = sizeof(rrd_value_t) * row_length * row_count;
157     rrd_values = (rrd_value_t *) malloc(totalbytes);
158     if (rrd_values == NULL) {
159         rrd_set_error("apply smoother: memory allocation failure");
160         return -1;
161     }
162
163     /* rra_start is at the beginning of this rra */
164     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
165         rrd_set_error("seek to rra %d failed", rra_start);
166         free(rrd_values);
167         return -1;
168     }
169     rrd_flush(rrd_file);
170     /* could read all data in a single block, but we need to
171      * check for NA values */
172     for (i = 0; i < row_count; ++i) {
173         for (j = 0; j < row_length; ++j) {
174             if (rrd_read
175                 (rrd_file, &(rrd_values[i * row_length + j]),
176                  sizeof(rrd_value_t) * 1)
177                 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
178                 rrd_set_error("reading value failed: %s",
179                               rrd_strerror(errno));
180             }
181             if (isnan(rrd_values[i * row_length + j])) {
182                 /* can't apply smoothing, still uninitialized values */
183 #ifdef DEBUG
184                 fprintf(stderr,
185                         "apply_smoother: NA detected in seasonal array: %ld %ld\n",
186                         i, j);
187 #endif
188                 free(rrd_values);
189                 return 0;
190             }
191         }
192     }
193
194     /* allocate queues, one for each data source */
195     buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
196     for (i = 0; i < row_length; ++i) {
197         queue_alloc(&(buffers[i]), 2 * offset + 1);
198     }
199     /* need working average initialized to 0 */
200     working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
201     baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
202
203     /* compute sums of the first 2*offset terms */
204     for (i = 0; i < 2 * offset; ++i) {
205         k = MyMod(i - offset, row_count);
206         for (j = 0; j < row_length; ++j) {
207             queue_push(buffers[j], rrd_values[k * row_length + j]);
208             working_average[j] += rrd_values[k * row_length + j];
209         }
210     }
211
212     /* compute moving averages */
213     for (i = offset; i < row_count + offset; ++i) {
214         for (j = 0; j < row_length; ++j) {
215             k = MyMod(i, row_count);
216             /* add a term to the sum */
217             working_average[j] += rrd_values[k * row_length + j];
218             queue_push(buffers[j], rrd_values[k * row_length + j]);
219
220             /* reset k to be the center of the window */
221             k = MyMod(i - offset, row_count);
222             /* overwrite rdd_values entry, the old value is already
223              * saved in buffers */
224             rrd_values[k * row_length + j] =
225                 working_average[j] / (2 * offset + 1);
226             baseline[j] += rrd_values[k * row_length + j];
227
228             /* remove a term from the sum */
229             working_average[j] -= queue_pop(buffers[j]);
230         }
231     }
232
233     for (i = 0; i < row_length; ++i) {
234         queue_dealloc(buffers[i]);
235         baseline[i] /= row_count;
236     }
237     free(buffers);
238     free(working_average);
239
240     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
241         rrd_value_t (
242     *init_seasonality) (
243     rrd_value_t seasonal_coef,
244     rrd_value_t intercept);
245
246         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
247         case CF_HWPREDICT:
248             init_seasonality = hw_additive_init_seasonality;
249             break;
250         case CF_MHWPREDICT:
251             init_seasonality = hw_multiplicative_init_seasonality;
252             break;
253         default:
254             rrd_set_error("apply smoother: SEASONAL rra doesn't have "
255                           "valid dependency: %s",
256                           rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam);
257             return -1;
258         }
259
260         for (j = 0; j < row_length; ++j) {
261             for (i = 0; i < row_count; ++i) {
262                 rrd_values[i * row_length + j] =
263                     init_seasonality(rrd_values[i * row_length + j],
264                                      baseline[j]);
265             }
266             /* update the baseline coefficient,
267              * first, compute the cdp_index. */
268             offset = hw_dep_idx(rrd, rra_idx) * row_length + j;
269             (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
270                 baseline[j];
271         }
272         /* flush cdp to disk */
273         rrd_flush(rrd_file);
274         if (rrd_seek(rrd_file, sizeof(stat_head_t) +
275                      rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
276                      rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
277                      sizeof(live_head_t) +
278                      rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
279             rrd_set_error("apply_smoother: seek to cdp_prep failed");
280             free(rrd_values);
281             return -1;
282         }
283         if (rrd_write(rrd_file, rrd->cdp_prep,
284                       sizeof(cdp_prep_t) *
285                       (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
286             != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
287                           (rrd->stat_head->ds_cnt))) {
288             rrd_set_error("apply_smoother: cdp_prep write failed");
289             free(rrd_values);
290             return -1;
291         }
292     }
293
294     /* endif CF_SEASONAL */
295     /* flush updated values to disk */
296     rrd_flush(rrd_file);
297     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
298         rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
299         free(rrd_values);
300         return -1;
301     }
302     /* write as a single block */
303     if (rrd_write
304         (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
305         != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
306         rrd_set_error("apply_smoother: write failed to %lu", rra_start);
307         free(rrd_values);
308         return -1;
309     }
310
311     rrd_flush(rrd_file);
312     free(rrd_values);
313     free(baseline);
314     return 0;
315 }
316
317 /* Reset aberrant behavior model coefficients, including intercept, slope,
318  * seasonal, and seasonal deviation for the specified data source. */
319 void reset_aberrant_coefficients(
320     rrd_t *rrd,
321     rrd_file_t *rrd_file,
322     unsigned long ds_idx)
323 {
324     unsigned long cdp_idx, rra_idx, i;
325     unsigned long cdp_start, rra_start;
326     rrd_value_t nan_buffer = DNAN;
327
328     /* compute the offset for the cdp area */
329     cdp_start = sizeof(stat_head_t) +
330         rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
331         rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
332         sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
333     /* compute the offset for the first rra */
334     rra_start = cdp_start +
335         (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
336         sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
337
338     /* loop over the RRAs */
339     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
340         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
341         switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
342         case CF_HWPREDICT:
343         case CF_MHWPREDICT:
344             init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
345             break;
346         case CF_SEASONAL:
347         case CF_DEVSEASONAL:
348             /* don't use init_seasonal because it will reset burn-in, which
349              * means different data sources will be calling for the smoother
350              * at different times. */
351             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
352             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
353             /* move to first entry of data source for this rra */
354             rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
355                      SEEK_SET);
356             /* entries for the same data source are not contiguous, 
357              * temporal entries are contiguous */
358             for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
359                 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
360                     != sizeof(rrd_value_t) * 1) {
361                     rrd_set_error
362                         ("reset_aberrant_coefficients: write failed data source %lu rra %s",
363                          ds_idx, rrd->rra_def[rra_idx].cf_nam);
364                     return;
365                 }
366                 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
367                          sizeof(rrd_value_t), SEEK_CUR);
368             }
369             break;
370         case CF_FAILURES:
371             erase_violations(rrd, cdp_idx, rra_idx);
372             break;
373         default:
374             break;
375         }
376         /* move offset to the next rra */
377         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
378             sizeof(rrd_value_t);
379     }
380     rrd_seek(rrd_file, cdp_start, SEEK_SET);
381     if (rrd_write(rrd_file, rrd->cdp_prep,
382                   sizeof(cdp_prep_t) *
383                   (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
384         != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
385                       (rrd->stat_head->ds_cnt))) {
386         rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
387     }
388 }
389
390 void init_hwpredict_cdp(
391     cdp_prep_t *cdp)
392 {
393     cdp->scratch[CDP_hw_intercept].u_val = DNAN;
394     cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
395     cdp->scratch[CDP_hw_slope].u_val = DNAN;
396     cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
397     cdp->scratch[CDP_null_count].u_cnt = 1;
398     cdp->scratch[CDP_last_null_count].u_cnt = 1;
399 }
400
401 void init_seasonal_cdp(
402     cdp_prep_t *cdp)
403 {
404     cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
405     cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
406     cdp->scratch[CDP_init_seasonal].u_cnt = 1;
407 }
408
409 int update_aberrant_CF(
410     rrd_t *rrd,
411     rrd_value_t pdp_val,
412     enum cf_en current_cf,
413     unsigned long cdp_idx,
414     unsigned long rra_idx,
415     unsigned long ds_idx,
416     unsigned short CDP_scratch_idx,
417     rrd_value_t *seasonal_coef)
418 {
419     static hw_functions_t hw_multiplicative_functions = {
420         hw_multiplicative_calculate_prediction,
421         hw_multiplicative_calculate_intercept,
422         hw_calculate_slope,
423         hw_multiplicative_calculate_seasonality,
424         hw_multiplicative_init_seasonality,
425         hw_calculate_seasonal_deviation,
426         hw_init_seasonal_deviation,
427         1.0             /* identity value */
428     };
429
430     static hw_functions_t hw_additive_functions = {
431         hw_additive_calculate_prediction,
432         hw_additive_calculate_intercept,
433         hw_calculate_slope,
434         hw_additive_calculate_seasonality,
435         hw_additive_init_seasonality,
436         hw_calculate_seasonal_deviation,
437         hw_init_seasonal_deviation,
438         0.0             /* identity value  */
439     };
440
441     rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
442     switch (current_cf) {
443     case CF_HWPREDICT:
444         return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
445                                 CDP_scratch_idx, &hw_additive_functions);
446     case CF_MHWPREDICT:
447         return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
448                                 CDP_scratch_idx,
449                                 &hw_multiplicative_functions);
450     case CF_DEVPREDICT:
451         return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
452                                  CDP_scratch_idx);
453     case CF_SEASONAL:
454         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
455         case CF_HWPREDICT:
456             return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
457                                    CDP_scratch_idx, seasonal_coef,
458                                    &hw_additive_functions);
459         case CF_MHWPREDICT:
460             return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
461                                    CDP_scratch_idx, seasonal_coef,
462                                    &hw_multiplicative_functions);
463         default:
464             return -1;
465         }
466     case CF_DEVSEASONAL:
467         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
468         case CF_HWPREDICT:
469             return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
470                                       CDP_scratch_idx, seasonal_coef,
471                                       &hw_additive_functions);
472         case CF_MHWPREDICT:
473             return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
474                                       CDP_scratch_idx, seasonal_coef,
475                                       &hw_multiplicative_functions);
476         default:
477             return -1;
478         }
479     case CF_FAILURES:
480         switch (cf_conv
481                 (rrd->rra_def[hw_dep_idx(rrd, hw_dep_idx(rrd, rra_idx))].
482                  cf_nam)) {
483         case CF_HWPREDICT:
484             return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
485                                    CDP_scratch_idx, &hw_additive_functions);
486         case CF_MHWPREDICT:
487             return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
488                                    CDP_scratch_idx,
489                                    &hw_multiplicative_functions);
490         default:
491             return -1;
492         }
493     case CF_AVERAGE:
494     default:
495         return 0;
496     }
497     return -1;
498 }
499
500 static unsigned long MyMod(
501     signed long val,
502     unsigned long mod)
503 {
504     unsigned long new_val;
505
506     if (val < 0)
507         new_val = ((unsigned long) abs(val)) % mod;
508     else
509         new_val = (val % mod);
510
511     if (val < 0)
512         return (mod - new_val);
513     else
514         return (new_val);
515 }
516
517 /* a standard fixed-capacity FIF0 queue implementation
518  * No overflow checking is performed. */
519 int queue_alloc(
520     FIFOqueue **q,
521     int capacity)
522 {
523     *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
524     if (*q == NULL)
525         return -1;
526     (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
527     if ((*q)->queue == NULL) {
528         free(*q);
529         return -1;
530     }
531     (*q)->capacity = capacity;
532     (*q)->head = capacity;
533     (*q)->tail = 0;
534     return 0;
535 }
536
537 int queue_isempty(
538     FIFOqueue *q)
539 {
540     return (q->head % q->capacity == q->tail);
541 }
542
543 void queue_push(
544     FIFOqueue *q,
545     rrd_value_t value)
546 {
547     q->queue[(q->tail)++] = value;
548     q->tail = q->tail % q->capacity;
549 }
550
551 rrd_value_t queue_pop(
552     FIFOqueue *q)
553 {
554     q->head = q->head % q->capacity;
555     return q->queue[(q->head)++];
556 }
557
558 void queue_dealloc(
559     FIFOqueue *q)
560 {
561     free(q->queue);
562     free(q);
563 }