src/utils_cache.[ch]: Added the `uc_check_timeout' function.
[collectd.git] / src / utils_cache.c
1 /**
2  * collectd - src/utils_cache.c
3  * Copyright (C) 2007  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Author:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "common.h"
24 #include "plugin.h"
25 #include "utils_avltree.h"
26
27 #include <assert.h>
28 #include <pthread.h>
29
30 typedef struct cache_entry_s
31 {
32         char name[6 * DATA_MAX_NAME_LEN];
33         int        values_num;
34         gauge_t   *values_gauge;
35         counter_t *values_counter;
36         /* Time contained in the package
37          * (for calculating rates) */
38         time_t last_time;
39         /* Time according to the local clock
40          * (for purging old entries) */
41         time_t last_update;
42         /* Interval in which the data is collected
43          * (for purding old entries) */
44         int interval;
45 } cache_entry_t;
46
47 static avl_tree_t     *cache_tree = NULL;
48 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
49
50 static int cache_compare (const cache_entry_t *a, const cache_entry_t *b)
51 {
52   assert ((a != NULL) && (b != NULL));
53   return (strcmp (a->name, b->name));
54 } /* int cache_compare */
55
56 static int uc_send_notification (const char *name)
57 {
58   cache_entry_t *ce = NULL;
59   int status;
60
61   char *name_copy;
62   char *host;
63   char *plugin;
64   char *plugin_instance;
65   char *type;
66   char *type_instance;
67
68   notification_t n;
69
70   memset (&n, '\0', sizeof (n));
71
72   name_copy = strdup (name);
73   if (name_copy == NULL)
74   {
75     ERROR ("uc_send_notification: strdup failed.");
76     return (-1);
77   }
78
79   status = parse_identifier (name_copy, &host,
80       &plugin, &plugin_instance,
81       &type, &type_instance);
82   if (status != 0)
83   {
84     ERROR ("uc_send_notification: Cannot parse name `%s'", name);
85     return (-1);
86   }
87
88   n.severity = NOTIF_FAILURE;
89   strncpy (n.host, host, sizeof (n.host));
90   n.host[sizeof (n.host) - 1] = '\0';
91
92   sfree (name_copy);
93   name_copy = host = plugin = plugin_instance = type = type_instance = NULL;
94
95   pthread_mutex_lock (&cache_lock);
96
97   n.time = time (NULL);
98
99   status = avl_get (cache_tree, name, (void *) &ce);
100   if (status != 0)
101   {
102     pthread_mutex_unlock (&cache_lock);
103     sfree (name_copy);
104     return (-1);
105   }
106     
107   /* Check if the entry has been updated in the meantime */
108   if ((n.time - ce->last_update) <= (2 * ce->interval))
109   {
110     pthread_mutex_unlock (&cache_lock);
111     sfree (name_copy);
112     return (-1);
113   }
114
115   snprintf (n.message, sizeof (n.message),
116       "%s has not been updated for %i seconds.", name,
117       (int) (ce->last_update - n.time));
118
119   pthread_mutex_unlock (&cache_lock);
120
121   n.message[sizeof (n.message) - 1] = '\0';
122   plugin_dispatch_notification (&n);
123
124   return (0);
125 } /* int uc_send_notification */
126
127 int uc_init (void)
128 {
129   if (cache_tree == NULL)
130     cache_tree = avl_create ((int (*) (const void *, const void *))
131         cache_compare);
132
133   return (0);
134 } /* int uc_init */
135
136 int uc_check_timeout (void)
137 {
138   time_t now;
139   cache_entry_t *ce;
140
141   char **keys = NULL;
142   int keys_len = 0;
143
144   char *key;
145   avl_iterator_t *iter;
146   int i;
147   
148   pthread_mutex_lock (&cache_lock);
149
150   now = time (NULL);
151
152   /* Build a list of entries to be flushed */
153   iter = avl_get_iterator (cache_tree);
154   while (avl_iterator_next (iter, (void *) &key, (void *) &ce) == 0)
155   {
156     /* If entry has not been updated, add to `keys' array */
157     if ((now - ce->last_update) > (2 * ce->interval))
158     {
159       char **tmp;
160
161       tmp = (char **) realloc ((void *) keys,
162           (keys_len + 1) * sizeof (char *));
163       if (tmp == NULL)
164       {
165         ERROR ("uc_purge: realloc failed.");
166         avl_iterator_destroy (iter);
167         return (-1);
168       }
169
170       keys = tmp;
171       keys[keys_len] = strdup (key);
172       if (keys[keys_len] == NULL)
173       {
174         ERROR ("uc_check_timeout: strdup failed.");
175         continue;
176       }
177       keys_len++;
178     }
179   } /* while (avl_iterator_next) */
180
181   for (i = 0; i < keys_len; i++)
182   {
183     int status;
184
185     /* TODO: Check if value interesting:
186      * - Not interesting: Remove value from cache and shut up
187      * - Interesting:     Don't remove value from cache but send a
188      *                    notification.
189      */
190     status = avl_remove (cache_tree, keys[i], (void *) &key, (void *) &ce);
191     if (status != 0)
192     {
193       ERROR ("uc_check_timeout: avl_remove (%s) failed.", keys[i]);
194       continue;
195     }
196
197     sfree (key);
198     sfree (ce);
199   }
200
201   avl_iterator_destroy (iter);
202
203   pthread_mutex_unlock (&cache_lock);
204
205   for (i = 0; i < keys_len; i++)
206   {
207     uc_send_notification (keys[i]);
208     sfree (keys[i]);
209   }
210
211   sfree (keys);
212
213   return (0);
214 } /* int uc_check_timeout */
215
216 int uc_update (const data_set_t *ds, const value_list_t *vl)
217 {
218   char name[6 * DATA_MAX_NAME_LEN];
219   cache_entry_t *ce = NULL;
220
221   if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
222   {
223     ERROR ("uc_insert: FORMAT_VL failed.");
224     return (-1);
225   }
226
227   pthread_mutex_lock (&cache_lock);
228
229   if (avl_get (cache_tree, name, (void *) &ce) == 0)
230   {
231     int i;
232
233     assert (ce != NULL);
234     assert (ce->values_num == ds->ds_num);
235
236     if (ce->last_time >= vl->time)
237     {
238       pthread_mutex_unlock (&cache_lock);
239       NOTICE ("uc_insert: Value too old: name = %s; value time = %u; "
240           "last cache update = %u;",
241           name, (unsigned int) vl->time, (unsigned int) ce->last_time);
242       return (-1);
243     }
244
245     if ((ce->last_time + ce->interval) < vl->time)
246     {
247       /* TODO: Implement a `real' okay notification. Watch out for locking
248        * issues, though! */
249       NOTICE ("uc_insert: Okay notification for %s", name);
250     }
251
252     for (i = 0; i < ds->ds_num; i++)
253     {
254       if (ds->ds[i].type == DS_TYPE_COUNTER)
255       {
256         counter_t diff;
257
258         /* check if the counter has wrapped around */
259         if (vl->values[i].counter < ce->values_counter[i])
260         {
261           if (ce->values_counter[i] <= 4294967295U)
262             diff = (4294967295U - ce->values_counter[i])
263               + vl->values[i].counter;
264           else
265             diff = (18446744073709551615ULL - ce->values_counter[i])
266               + vl->values[i].counter;
267         }
268         else /* counter has NOT wrapped around */
269         {
270           diff = vl->values[i].counter - ce->values_counter[i];
271         }
272
273         ce->values_gauge[i] = ((double) diff)
274           / ((double) (vl->time - ce->last_time));
275         ce->values_counter[i] = vl->values[i].counter;
276       }
277       else /* if (ds->ds[i].type == DS_TYPE_GAUGE) */
278       {
279         ce->values_gauge[i] = vl->values[i].gauge;
280       }
281       DEBUG ("uc_insert: %s: ds[%i] = %lf", name, i, ce->values_gauge[i]);
282     } /* for (i) */
283
284     ce->last_time = vl->time;
285     ce->last_update = time (NULL);
286     ce->interval = vl->interval;
287   }
288   else /* key is not found */
289   {
290     int i;
291     size_t ce_size = sizeof (cache_entry_t)
292       + ds->ds_num * (sizeof (counter_t) + sizeof (gauge_t));
293     char *key;
294     
295     key = strdup (name);
296     if (key == NULL)
297     {
298       pthread_mutex_unlock (&cache_lock);
299       ERROR ("uc_insert: strdup failed.");
300       return (-1);
301     }
302
303     ce = (cache_entry_t *) malloc (ce_size);
304     if (ce == NULL)
305     {
306       pthread_mutex_unlock (&cache_lock);
307       ERROR ("uc_insert: malloc (%u) failed.", (unsigned int) ce_size);
308       return (-1);
309     }
310
311     memset (ce, '\0', ce_size);
312
313     strncpy (ce->name, name, sizeof (ce->name));
314     ce->name[sizeof (ce->name) - 1] = '\0';
315
316     ce->values_num = ds->ds_num;
317     ce->values_gauge = (gauge_t *) (ce + 1);
318     ce->values_counter = (counter_t *) (ce->values_gauge + ce->values_num);
319
320     for (i = 0; i < ds->ds_num; i++)
321     {
322       if (ds->ds[i].type == DS_TYPE_COUNTER)
323       {
324         ce->values_gauge[i] = NAN;
325         ce->values_counter[i] = vl->values[i].counter;
326       }
327       else /* if (ds->ds[i].type == DS_TYPE_GAUGE) */
328       {
329         ce->values_gauge[i] = vl->values[i].gauge;
330       }
331     } /* for (i) */
332
333     ce->last_time = vl->time;
334
335     if (avl_insert (cache_tree, key, ce) != 0)
336     {
337       pthread_mutex_unlock (&cache_lock);
338       ERROR ("uc_insert: avl_insert failed.");
339       return (-1);
340     }
341
342     DEBUG ("uc_insert: Added %s to the cache.", name);
343   } /* if (key is not found) */
344
345   pthread_mutex_unlock (&cache_lock);
346
347   return (0);
348 } /* int uc_insert */
349
350 gauge_t *uc_get_rate (const data_set_t *ds, const value_list_t *vl)
351 {
352   char name[6 * DATA_MAX_NAME_LEN];
353   gauge_t *ret = NULL;
354   cache_entry_t *ce = NULL;
355
356   if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
357   {
358     ERROR ("uc_insert: FORMAT_VL failed.");
359     return (NULL);
360   }
361
362   pthread_mutex_lock (&cache_lock);
363
364   if (avl_get (cache_tree, name, (void *) &ce) == 0)
365   {
366     assert (ce != NULL);
367     assert (ce->values_num == ds->ds_num);
368
369     ret = (gauge_t *) malloc (ce->values_num * sizeof (gauge_t));
370     if (ret == NULL)
371     {
372       ERROR ("uc_get_rate: malloc failed.");
373     }
374     else
375     {
376       memcpy (ret, ce->values_gauge, ce->values_num * sizeof (gauge_t));
377     }
378   }
379
380   pthread_mutex_unlock (&cache_lock);
381
382   return (ret);
383 } /* gauge_t *uc_get_rate */
384
385 /* vim: set sw=2 ts=8 sts=2 tw=78 : */