Add custom message for threshold and missings.
[collectd.git] / src / utils_cache.c
1 /**
2  * collectd - src/utils_cache.c
3  * Copyright (C) 2007,2008  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Author:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "common.h"
24 #include "plugin.h"
25 #include "utils_avltree.h"
26 #include "utils_cache.h"
27 #include "utils_threshold.h"
28 #include "utils_subst.h"
29 #include "meta_data.h"
30
31 #include <assert.h>
32 #include <pthread.h>
33
34 typedef struct cache_entry_s
35 {
36         char name[6 * DATA_MAX_NAME_LEN];
37         int        values_num;
38         gauge_t   *values_gauge;
39         value_t   *values_raw;
40         /* Time contained in the package
41          * (for calculating rates) */
42         time_t last_time;
43         /* Time according to the local clock
44          * (for purging old entries) */
45         time_t last_update;
46         /* Interval in which the data is collected
47          * (for purding old entries) */
48         int interval;
49         int state;
50         int hits;
51
52         /*
53          * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
54          * !  0  !  1  !  2  !  3  !  4  !  5  !  6  !  7  !  8  ! ...
55          * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
56          * ! ds0 ! ds1 ! ds2 ! ds0 ! ds1 ! ds2 ! ds0 ! ds1 ! ds2 ! ...
57          * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
58          * !      t = 0      !      t = 1      !      t = 2      ! ...
59          * +-----------------+-----------------+-----------------+----
60          */
61         gauge_t *history;
62         size_t   history_index; /* points to the next position to write to. */
63         size_t   history_length;
64
65         meta_data_t *meta;
66 } cache_entry_t;
67
68 static c_avl_tree_t   *cache_tree = NULL;
69 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
70
71 static int cache_compare (const cache_entry_t *a, const cache_entry_t *b)
72 {
73   assert ((a != NULL) && (b != NULL));
74   return (strcmp (a->name, b->name));
75 } /* int cache_compare */
76
77 static cache_entry_t *cache_alloc (int values_num)
78 {
79   cache_entry_t *ce;
80
81   ce = (cache_entry_t *) malloc (sizeof (cache_entry_t));
82   if (ce == NULL)
83   {
84     ERROR ("utils_cache: cache_alloc: malloc failed.");
85     return (NULL);
86   }
87   memset (ce, '\0', sizeof (cache_entry_t));
88   ce->values_num = values_num;
89
90   ce->values_gauge = calloc (values_num, sizeof (*ce->values_gauge));
91   ce->values_raw   = calloc (values_num, sizeof (*ce->values_raw));
92   if ((ce->values_gauge == NULL) || (ce->values_raw == NULL))
93   {
94     sfree (ce->values_gauge);
95     sfree (ce->values_raw);
96     sfree (ce);
97     ERROR ("utils_cache: cache_alloc: calloc failed.");
98     return (NULL);
99   }
100
101   ce->history = NULL;
102   ce->history_length = 0;
103   ce->meta = NULL;
104
105   return (ce);
106 } /* cache_entry_t *cache_alloc */
107
108 static void cache_free (cache_entry_t *ce)
109 {
110   if (ce == NULL)
111     return;
112
113   sfree (ce->values_gauge);
114   sfree (ce->values_raw);
115   sfree (ce->history);
116   if (ce->meta != NULL)
117   {
118     meta_data_destroy (ce->meta);
119     ce->meta = NULL;
120   }
121   sfree (ce);
122 } /* void cache_free */
123
124 static int uc_send_notification (const char *name)
125 {
126   cache_entry_t *ce = NULL;
127   int status;
128
129   char *name_copy;
130   char *host;
131   char *plugin;
132   char *plugin_instance;
133   char *type;
134   char *type_instance;
135   threshold_t th;
136
137   notification_t n;
138
139   data_set_t ds;
140   value_list_t vl;
141
142   name_copy = strdup (name);
143   if (name_copy == NULL)
144   {
145     ERROR ("uc_send_notification: strdup failed.");
146     return (-1);
147   }
148
149   status = parse_identifier (name_copy, &host,
150       &plugin, &plugin_instance,
151       &type, &type_instance);
152   if (status != 0)
153   {
154     ERROR ("uc_send_notification: Cannot parse name `%s'", name);
155     return (-1);
156   }
157
158   memset (&ds, '\0', sizeof (ds));
159   memset (&vl, '\0', sizeof (vl));
160   sstrncpy (vl.host, host, sizeof (vl.host));
161   sstrncpy (vl.plugin, plugin, sizeof (vl.plugin));
162   if (plugin_instance != NULL)
163     sstrncpy (vl.plugin_instance, plugin_instance, sizeof (vl.plugin_instance));
164   sstrncpy (ds.type, type, sizeof (ds.type));
165   sstrncpy (vl.type, type, sizeof (vl.type));
166   if (type_instance != NULL)
167     sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance));
168
169
170   /* Copy the associative members */
171   notification_init (&n, NOTIF_FAILURE, /* host = */ NULL,
172       host, plugin, plugin_instance, type, type_instance);
173
174   sfree (name_copy);
175   name_copy = host = plugin = plugin_instance = type = type_instance = NULL;
176
177
178   pthread_mutex_lock (&cache_lock);
179
180   /*
181    * Set the time _after_ getting the lock because we don't know how long
182    * acquiring the lock takes and we will use this time later to decide
183    * whether or not the state is OKAY.
184    */
185   n.time = time (NULL);
186
187   status = c_avl_get (cache_tree, name, (void *) &ce);
188   if (status != 0)
189   {
190     pthread_mutex_unlock (&cache_lock);
191     sfree (name_copy);
192     return (-1);
193   }
194     
195   /* Check if the entry has been updated in the meantime */
196   if ((n.time - ce->last_update) < (2 * ce->interval))
197   {
198     ce->state = STATE_OKAY;
199     pthread_mutex_unlock (&cache_lock);
200     sfree (name_copy);
201     return (-1);
202   }
203
204   /* if the associated threshold has a missing message, then use custom
205    * message. FIXME: we do a threshold_search here and in uc_check_timeout
206    * (calling ut_check_interesting, but we really need to do this once */
207   if ( !ut_search_threshold(&vl, &th) &&
208       (th.missing_message != NULL) )
209   {
210     char msg[NOTIF_MAX_MSG_LEN];
211     char temp[NOTIF_MAX_MSG_LEN];
212
213     sstrncpy (msg, th.missing_message, sizeof (msg));
214     (void) ut_build_message (msg, NOTIF_MAX_MSG_LEN, th.missing_message,
215         &ds, 0, &vl, ce->values_gauge,
216         &n, &th);
217
218 #define REPLACE_FIELD(t,v) \
219     if (subst_string (temp, sizeof (temp), msg, t, v) != NULL) \
220     sstrncpy (msg, temp, sizeof (msg));
221
222     char itoa_temp[NOTIF_MAX_MSG_LEN];
223 #define ITOA(string,i) \
224     memset(string,0x00,sizeof(string)); \
225     snprintf(string, sizeof(string), "%i", i);
226
227     ITOA(itoa_temp, (int)(n.time - ce->last_update))
228       REPLACE_FIELD("%{missing}", itoa_temp)
229
230     (void) ssnprintf (n.message, sizeof (n.message),
231         "%s", msg);
232   }
233   else
234   {
235     ssnprintf (n.message, sizeof (n.message),
236         "%s has not been updated for %i seconds.", name,
237         (int) (n.time - ce->last_update));
238   }
239
240   pthread_mutex_unlock (&cache_lock);
241
242   plugin_dispatch_notification (&n);
243
244   return (0);
245 } /* int uc_send_notification */
246
247 static void uc_check_range (const data_set_t *ds, cache_entry_t *ce)
248 {
249   int i;
250
251   for (i = 0; i < ds->ds_num; i++)
252   {
253     if (isnan (ce->values_gauge[i]))
254       continue;
255     else if (ce->values_gauge[i] < ds->ds[i].min)
256       ce->values_gauge[i] = NAN;
257     else if (ce->values_gauge[i] > ds->ds[i].max)
258       ce->values_gauge[i] = NAN;
259   }
260 } /* void uc_check_range */
261
262 static int uc_insert (const data_set_t *ds, const value_list_t *vl,
263     const char *key)
264 {
265   int i;
266   char *key_copy;
267   cache_entry_t *ce;
268
269   /* `cache_lock' has been locked by `uc_update' */
270
271   key_copy = strdup (key);
272   if (key_copy == NULL)
273   {
274     ERROR ("uc_insert: strdup failed.");
275     return (-1);
276   }
277
278   ce = cache_alloc (ds->ds_num);
279   if (ce == NULL)
280   {
281     sfree (key_copy);
282     ERROR ("uc_insert: cache_alloc (%i) failed.", ds->ds_num);
283     return (-1);
284   }
285
286   sstrncpy (ce->name, key, sizeof (ce->name));
287
288   for (i = 0; i < ds->ds_num; i++)
289   {
290     switch (ds->ds[i].type)
291     {
292       case DS_TYPE_COUNTER:
293         ce->values_gauge[i] = NAN;
294         ce->values_raw[i].counter = vl->values[i].counter;
295         break;
296
297       case DS_TYPE_GAUGE:
298         ce->values_gauge[i] = vl->values[i].gauge;
299         ce->values_raw[i].gauge = vl->values[i].gauge;
300         break;
301
302       case DS_TYPE_DERIVE:
303         ce->values_gauge[i] = NAN;
304         ce->values_raw[i].derive = vl->values[i].derive;
305         break;
306
307       case DS_TYPE_ABSOLUTE:
308         ce->values_gauge[i] = NAN;
309         if (vl->interval > 0)
310           ce->values_gauge[i] = ((double) vl->values[i].absolute)
311             / ((double) vl->interval);
312         ce->values_raw[i].absolute = vl->values[i].absolute;
313         break;
314         
315       default:
316         /* This shouldn't happen. */
317         ERROR ("uc_insert: Don't know how to handle data source type %i.",
318             ds->ds[i].type);
319         return (-1);
320     } /* switch (ds->ds[i].type) */
321   } /* for (i) */
322
323   /* Prune invalid gauge data */
324   uc_check_range (ds, ce);
325
326   ce->last_time = vl->time;
327   ce->last_update = time (NULL);
328   ce->interval = vl->interval;
329   ce->state = STATE_OKAY;
330
331   if (c_avl_insert (cache_tree, key_copy, ce) != 0)
332   {
333     sfree (key_copy);
334     ERROR ("uc_insert: c_avl_insert failed.");
335     return (-1);
336   }
337
338   DEBUG ("uc_insert: Added %s to the cache.", key);
339   return (0);
340 } /* int uc_insert */
341
342 int uc_init (void)
343 {
344   if (cache_tree == NULL)
345     cache_tree = c_avl_create ((int (*) (const void *, const void *))
346         cache_compare);
347
348   return (0);
349 } /* int uc_init */
350
351 int uc_check_timeout (void)
352 {
353   time_t now;
354   cache_entry_t *ce;
355
356   char **keys = NULL;
357   int keys_len = 0;
358
359   char *key;
360   c_avl_iterator_t *iter;
361   int i;
362   
363   pthread_mutex_lock (&cache_lock);
364
365   now = time (NULL);
366
367   /* Build a list of entries to be flushed */
368   iter = c_avl_get_iterator (cache_tree);
369   while (c_avl_iterator_next (iter, (void *) &key, (void *) &ce) == 0)
370   {
371     /* If entry has not been updated, add to `keys' array */
372     if ((now - ce->last_update) >= (timeout_g * ce->interval))
373     {
374       char **tmp;
375
376       tmp = (char **) realloc ((void *) keys,
377           (keys_len + 1) * sizeof (char *));
378       if (tmp == NULL)
379       {
380         ERROR ("uc_check_timeout: realloc failed.");
381         c_avl_iterator_destroy (iter);
382         sfree (keys);
383         pthread_mutex_unlock (&cache_lock);
384         return (-1);
385       }
386
387       keys = tmp;
388       keys[keys_len] = strdup (key);
389       if (keys[keys_len] == NULL)
390       {
391         ERROR ("uc_check_timeout: strdup failed.");
392         continue;
393       }
394       keys_len++;
395     }
396   } /* while (c_avl_iterator_next) */
397
398   ce = NULL;
399
400   for (i = 0; i < keys_len; i++)
401   {
402     int status;
403
404     status = ut_check_interesting (keys[i]);
405
406     if (status < 0)
407     {
408       ERROR ("uc_check_timeout: ut_check_interesting failed.");
409       sfree (keys[i]);
410       continue;
411     }
412     else if (status == 0) /* ``service'' is uninteresting */
413     {
414       DEBUG ("uc_check_timeout: %s is missing but ``uninteresting''",
415           keys[i]);
416       ce = NULL;
417       status = c_avl_remove (cache_tree, keys[i],
418           (void *) &key, (void *) &ce);
419       if (status != 0)
420       {
421         ERROR ("uc_check_timeout: c_avl_remove (%s) failed.", keys[i]);
422       }
423       sfree (keys[i]);
424       sfree (key);
425       if (ce != NULL)
426         cache_free (ce);
427       continue;
428     }
429
430     /* If we get here, the value is ``interesting''. Query the record from the
431      * cache and update the state field. */
432     if (c_avl_get (cache_tree, keys[i], (void *) &ce) != 0)
433     {
434       ERROR ("uc_check_timeout: cannot get data for %s from cache", keys[i]);
435       /* Do not free `keys[i]' so a notification is sent further down. */
436       continue;
437     }
438     assert (ce != NULL);
439
440     if (status == 2) /* persist */
441     {
442       DEBUG ("uc_check_timeout: %s is missing, sending notification.",
443           keys[i]);
444       ce->state = STATE_MISSING;
445       /* Do not free `keys[i]' so a notification is sent further down. */
446     }
447     else if (status == 1) /* do not persist */
448     {
449       if (ce->state == STATE_MISSING)
450       {
451         DEBUG ("uc_check_timeout: %s is missing but "
452             "notification has already been sent.",
453             keys[i]);
454         /* Set `keys[i]' to NULL to no notification is sent. */
455         sfree (keys[i]);
456       }
457       else /* (ce->state != STATE_MISSING) */
458       {
459         DEBUG ("uc_check_timeout: %s is missing, sending one notification.",
460             keys[i]);
461         ce->state = STATE_MISSING;
462         /* Do not free `keys[i]' so a notification is sent further down. */
463       }
464     }
465     else
466     {
467       WARNING ("uc_check_timeout: ut_check_interesting (%s) returned "
468           "invalid status %i.",
469           keys[i], status);
470       sfree (keys[i]);
471     }
472
473     /* Make really sure the next iteration doesn't work with this pointer.
474      * There have been too many bugs in the past.. :/  -- octo */
475     ce = NULL;
476   } /* for (keys[i]) */
477
478   c_avl_iterator_destroy (iter);
479
480   pthread_mutex_unlock (&cache_lock);
481
482   for (i = 0; i < keys_len; i++)
483   {
484     if (keys[i] == NULL)
485       continue;
486
487     uc_send_notification (keys[i]);
488     sfree (keys[i]);
489   }
490
491   sfree (keys);
492
493   return (0);
494 } /* int uc_check_timeout */
495
496 int uc_update (const data_set_t *ds, const value_list_t *vl)
497 {
498   char name[6 * DATA_MAX_NAME_LEN];
499   cache_entry_t *ce = NULL;
500   int send_okay_notification = 0;
501   time_t update_delay = 0;
502   notification_t n;
503   int status;
504   int i;
505
506   if (FORMAT_VL (name, sizeof (name), vl) != 0)
507   {
508     ERROR ("uc_update: FORMAT_VL failed.");
509     return (-1);
510   }
511
512   pthread_mutex_lock (&cache_lock);
513
514   status = c_avl_get (cache_tree, name, (void *) &ce);
515   if (status != 0) /* entry does not yet exist */
516   {
517     status = uc_insert (ds, vl, name);
518     pthread_mutex_unlock (&cache_lock);
519     return (status);
520   }
521
522   assert (ce != NULL);
523   assert (ce->values_num == ds->ds_num);
524
525   if (ce->last_time >= vl->time)
526   {
527     pthread_mutex_unlock (&cache_lock);
528     NOTICE ("uc_update: Value too old: name = %s; value time = %u; "
529         "last cache update = %u;",
530         name, (unsigned int) vl->time, (unsigned int) ce->last_time);
531     return (-1);
532   }
533
534   /* Send a notification (after the lock has been released) if we switch the
535    * state from something else to `okay'. */
536   if (ce->state == STATE_MISSING)
537   {
538     send_okay_notification = 1;
539     ce->state = STATE_OKAY;
540     update_delay = time (NULL) - ce->last_update;
541   }
542
543   for (i = 0; i < ds->ds_num; i++)
544   {
545     switch (ds->ds[i].type)
546     {
547       case DS_TYPE_COUNTER:
548         {
549           counter_t diff;
550
551           /* check if the counter has wrapped around */
552           if (vl->values[i].counter < ce->values_raw[i].counter)
553           {
554             if (ce->values_raw[i].counter <= 4294967295U)
555               diff = (4294967295U - ce->values_raw[i].counter)
556                 + vl->values[i].counter;
557             else
558               diff = (18446744073709551615ULL - ce->values_raw[i].counter)
559                 + vl->values[i].counter;
560           }
561           else /* counter has NOT wrapped around */
562           {
563             diff = vl->values[i].counter - ce->values_raw[i].counter;
564           }
565
566           ce->values_gauge[i] = ((double) diff)
567             / ((double) (vl->time - ce->last_time));
568           ce->values_raw[i].counter = vl->values[i].counter;
569         }
570         break;
571
572       case DS_TYPE_GAUGE:
573         ce->values_raw[i].gauge = vl->values[i].gauge;
574         ce->values_gauge[i] = vl->values[i].gauge;
575         break;
576
577       case DS_TYPE_DERIVE:
578         {
579           derive_t diff;
580
581           diff = vl->values[i].derive - ce->values_raw[i].derive;
582
583           ce->values_gauge[i] = ((double) diff)
584             / ((double) (vl->time - ce->last_time));
585           ce->values_raw[i].derive = vl->values[i].derive;
586         }
587         break;
588
589       case DS_TYPE_ABSOLUTE:
590         ce->values_gauge[i] = ((double) vl->values[i].absolute)
591           / ((double) (vl->time - ce->last_time));
592         ce->values_raw[i].absolute = vl->values[i].absolute;
593         break;
594
595       default:
596         /* This shouldn't happen. */
597         pthread_mutex_unlock (&cache_lock);
598         ERROR ("uc_update: Don't know how to handle data source type %i.",
599             ds->ds[i].type);
600         return (-1);
601     } /* switch (ds->ds[i].type) */
602
603     DEBUG ("uc_update: %s: ds[%i] = %lf", name, i, ce->values_gauge[i]);
604   } /* for (i) */
605
606   /* Update the history if it exists. */
607   if (ce->history != NULL)
608   {
609     assert (ce->history_index < ce->history_length);
610     for (i = 0; i < ce->values_num; i++)
611     {
612       size_t hist_idx = (ce->values_num * ce->history_index) + i;
613       ce->history[hist_idx] = ce->values_gauge[i];
614     }
615
616     assert (ce->history_length > 0);
617     ce->history_index = (ce->history_index + 1) % ce->history_length;
618   }
619
620   /* Prune invalid gauge data */
621   uc_check_range (ds, ce);
622
623   ce->last_time = vl->time;
624   ce->last_update = time (NULL);
625   ce->interval = vl->interval;
626
627   pthread_mutex_unlock (&cache_lock);
628
629   if (send_okay_notification == 0)
630     return (0);
631
632   /* Do not send okay notifications for uninteresting values, i. e. values for
633    * which no threshold is configured. */
634   status = ut_check_interesting (name);
635   if (status <= 0)
636     return (0);
637
638   /* Initialize the notification */
639   memset (&n, '\0', sizeof (n));
640   NOTIFICATION_INIT_VL (&n, vl, ds);
641
642   n.severity = NOTIF_OKAY;
643   n.time = vl->time;
644
645   ssnprintf (n.message, sizeof (n.message),
646       "Received a value for %s. It was missing for %u seconds.",
647       name, (unsigned int) update_delay);
648
649   plugin_dispatch_notification (&n);
650
651   return (0);
652 } /* int uc_update */
653
654 int uc_get_rate_by_name (const char *name, gauge_t **ret_values, size_t *ret_values_num)
655 {
656   gauge_t *ret = NULL;
657   size_t ret_num = 0;
658   cache_entry_t *ce = NULL;
659   int status = 0;
660
661   pthread_mutex_lock (&cache_lock);
662
663   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
664   {
665     assert (ce != NULL);
666
667     /* remove missing values from getval */
668     if (ce->state == STATE_MISSING)
669     {
670       status = -1;
671     }
672     else
673     {
674       ret_num = ce->values_num;
675       ret = (gauge_t *) malloc (ret_num * sizeof (gauge_t));
676       if (ret == NULL)
677       {
678         ERROR ("utils_cache: uc_get_rate_by_name: malloc failed.");
679         status = -1;
680       }
681       else
682       {
683         memcpy (ret, ce->values_gauge, ret_num * sizeof (gauge_t));
684       }
685     }
686   }
687   else
688   {
689     DEBUG ("utils_cache: uc_get_rate_by_name: No such value: %s", name);
690     status = -1;
691   }
692
693   pthread_mutex_unlock (&cache_lock);
694
695   if (status == 0)
696   {
697     *ret_values = ret;
698     *ret_values_num = ret_num;
699   }
700
701   return (status);
702 } /* gauge_t *uc_get_rate_by_name */
703
704 gauge_t *uc_get_rate (const data_set_t *ds, const value_list_t *vl)
705 {
706   char name[6 * DATA_MAX_NAME_LEN];
707   gauge_t *ret = NULL;
708   size_t ret_num = 0;
709   int status;
710
711   if (FORMAT_VL (name, sizeof (name), vl) != 0)
712   {
713     ERROR ("utils_cache: uc_get_rate: FORMAT_VL failed.");
714     return (NULL);
715   }
716
717   status = uc_get_rate_by_name (name, &ret, &ret_num);
718   if (status != 0)
719     return (NULL);
720
721   /* This is important - the caller has no other way of knowing how many
722    * values are returned. */
723   if (ret_num != (size_t) ds->ds_num)
724   {
725     ERROR ("utils_cache: uc_get_rate: ds[%s] has %i values, "
726         "but uc_get_rate_by_name returned %zu.",
727         ds->type, ds->ds_num, ret_num);
728     sfree (ret);
729     return (NULL);
730   }
731
732   return (ret);
733 } /* gauge_t *uc_get_rate */
734
735 int uc_get_names (char ***ret_names, time_t **ret_times, size_t *ret_number)
736 {
737   c_avl_iterator_t *iter;
738   char *key;
739   cache_entry_t *value;
740
741   char **names = NULL;
742   time_t *times = NULL;
743   size_t number = 0;
744
745   int status = 0;
746
747   if ((ret_names == NULL) || (ret_number == NULL))
748     return (-1);
749
750   pthread_mutex_lock (&cache_lock);
751
752   iter = c_avl_get_iterator (cache_tree);
753   while (c_avl_iterator_next (iter, (void *) &key, (void *) &value) == 0)
754   {
755     char **temp;
756
757     /* remove missing values when list values */
758     if (value->state == STATE_MISSING)
759       continue;
760
761     if (ret_times != NULL)
762     {
763       time_t *tmp_times;
764
765       tmp_times = (time_t *) realloc (times, sizeof (time_t) * (number + 1));
766       if (tmp_times == NULL)
767       {
768         status = -1;
769         break;
770       }
771       times = tmp_times;
772       times[number] = value->last_time;
773     }
774
775     temp = (char **) realloc (names, sizeof (char *) * (number + 1));
776     if (temp == NULL)
777     {
778       status = -1;
779       break;
780     }
781     names = temp;
782     names[number] = strdup (key);
783     if (names[number] == NULL)
784     {
785       status = -1;
786       break;
787     }
788     number++;
789   } /* while (c_avl_iterator_next) */
790
791   c_avl_iterator_destroy (iter);
792   pthread_mutex_unlock (&cache_lock);
793
794   if (status != 0)
795   {
796     size_t i;
797     
798     for (i = 0; i < number; i++)
799     {
800       sfree (names[i]);
801     }
802     sfree (names);
803
804     return (-1);
805   }
806
807   *ret_names = names;
808   if (ret_times != NULL)
809     *ret_times = times;
810   *ret_number = number;
811
812   return (0);
813 } /* int uc_get_names */
814
815 int uc_get_state (const data_set_t *ds, const value_list_t *vl)
816 {
817   char name[6 * DATA_MAX_NAME_LEN];
818   cache_entry_t *ce = NULL;
819   int ret = STATE_ERROR;
820
821   if (FORMAT_VL (name, sizeof (name), vl) != 0)
822   {
823     ERROR ("uc_get_state: FORMAT_VL failed.");
824     return (STATE_ERROR);
825   }
826
827   pthread_mutex_lock (&cache_lock);
828
829   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
830   {
831     assert (ce != NULL);
832     ret = ce->state;
833   }
834
835   pthread_mutex_unlock (&cache_lock);
836
837   return (ret);
838 } /* int uc_get_state */
839
840 int uc_set_state (const data_set_t *ds, const value_list_t *vl, int state)
841 {
842   char name[6 * DATA_MAX_NAME_LEN];
843   cache_entry_t *ce = NULL;
844   int ret = -1;
845
846   if (FORMAT_VL (name, sizeof (name), vl) != 0)
847   {
848     ERROR ("uc_get_state: FORMAT_VL failed.");
849     return (STATE_ERROR);
850   }
851
852   pthread_mutex_lock (&cache_lock);
853
854   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
855   {
856     assert (ce != NULL);
857     ret = ce->state;
858     ce->state = state;
859   }
860
861   pthread_mutex_unlock (&cache_lock);
862
863   return (ret);
864 } /* int uc_set_state */
865
866 int uc_get_history_by_name (const char *name,
867     gauge_t *ret_history, size_t num_steps, size_t num_ds)
868 {
869   cache_entry_t *ce = NULL;
870   size_t i;
871   int status = 0;
872
873   pthread_mutex_lock (&cache_lock);
874
875   status = c_avl_get (cache_tree, name, (void *) &ce);
876   if (status != 0)
877   {
878     pthread_mutex_unlock (&cache_lock);
879     return (-ENOENT);
880   }
881
882   if (((size_t) ce->values_num) != num_ds)
883   {
884     pthread_mutex_unlock (&cache_lock);
885     return (-EINVAL);
886   }
887
888   /* Check if there are enough values available. If not, increase the buffer
889    * size. */
890   if (ce->history_length < num_steps)
891   {
892     gauge_t *tmp;
893     size_t i;
894
895     tmp = realloc (ce->history, sizeof (*ce->history)
896         * num_steps * ce->values_num);
897     if (tmp == NULL)
898     {
899       pthread_mutex_unlock (&cache_lock);
900       return (-ENOMEM);
901     }
902
903     for (i = ce->history_length * ce->values_num;
904         i < (num_steps * ce->values_num);
905         i++)
906       tmp[i] = NAN;
907
908     ce->history = tmp;
909     ce->history_length = num_steps;
910   } /* if (ce->history_length < num_steps) */
911
912   /* Copy the values to the output buffer. */
913   for (i = 0; i < num_steps; i++)
914   {
915     size_t src_index;
916     size_t dst_index;
917
918     if (i < ce->history_index)
919       src_index = ce->history_index - (i + 1);
920     else
921       src_index = ce->history_length + ce->history_index - (i + 1);
922     src_index = src_index * num_ds;
923
924     dst_index = i * num_ds;
925
926     memcpy (ret_history + dst_index, ce->history + src_index,
927         sizeof (*ret_history) * num_ds);
928   }
929
930   pthread_mutex_unlock (&cache_lock);
931
932   return (0);
933 } /* int uc_get_history_by_name */
934
935 int uc_get_history (const data_set_t *ds, const value_list_t *vl,
936     gauge_t *ret_history, size_t num_steps, size_t num_ds)
937 {
938   char name[6 * DATA_MAX_NAME_LEN];
939
940   if (FORMAT_VL (name, sizeof (name), vl) != 0)
941   {
942     ERROR ("utils_cache: uc_get_history: FORMAT_VL failed.");
943     return (-1);
944   }
945
946   return (uc_get_history_by_name (name, ret_history, num_steps, num_ds));
947 } /* int uc_get_history */
948
949 int uc_get_hits (const data_set_t *ds, const value_list_t *vl)
950 {
951   char name[6 * DATA_MAX_NAME_LEN];
952   cache_entry_t *ce = NULL;
953   int ret = STATE_ERROR;
954
955   if (FORMAT_VL (name, sizeof (name), vl) != 0)
956   {
957     ERROR ("uc_get_state: FORMAT_VL failed.");
958     return (STATE_ERROR);
959   }
960
961   pthread_mutex_lock (&cache_lock);
962
963   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
964   {
965     assert (ce != NULL);
966     ret = ce->hits;
967   }
968
969   pthread_mutex_unlock (&cache_lock);
970
971   return (ret);
972 } /* int uc_get_hits */
973
974 int uc_set_hits (const data_set_t *ds, const value_list_t *vl, int hits)
975 {
976   char name[6 * DATA_MAX_NAME_LEN];
977   cache_entry_t *ce = NULL;
978   int ret = -1;
979
980   if (FORMAT_VL (name, sizeof (name), vl) != 0)
981   {
982     ERROR ("uc_get_state: FORMAT_VL failed.");
983     return (STATE_ERROR);
984   }
985
986   pthread_mutex_lock (&cache_lock);
987
988   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
989   {
990     assert (ce != NULL);
991     ret = ce->hits;
992     ce->hits = hits;
993   }
994
995   pthread_mutex_unlock (&cache_lock);
996
997   return (ret);
998 } /* int uc_set_hits */
999
1000 int uc_inc_hits (const data_set_t *ds, const value_list_t *vl, int step)
1001 {
1002   char name[6 * DATA_MAX_NAME_LEN];
1003   cache_entry_t *ce = NULL;
1004   int ret = -1;
1005
1006   if (FORMAT_VL (name, sizeof (name), vl) != 0)
1007   {
1008     ERROR ("uc_get_state: FORMAT_VL failed.");
1009     return (STATE_ERROR);
1010   }
1011
1012   pthread_mutex_lock (&cache_lock);
1013
1014   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
1015   {
1016     assert (ce != NULL);
1017     ret = ce->hits;
1018     ce->hits = ret + step;
1019   }
1020
1021   pthread_mutex_unlock (&cache_lock);
1022
1023   return (ret);
1024 } /* int uc_inc_hits */
1025
1026 /*
1027  * Meta data interface
1028  */
1029 /* XXX: This function will acquire `cache_lock' but will not free it! */
1030 static meta_data_t *uc_get_meta (const value_list_t *vl) /* {{{ */
1031 {
1032   char name[6 * DATA_MAX_NAME_LEN];
1033   cache_entry_t *ce = NULL;
1034   int status;
1035
1036   status = FORMAT_VL (name, sizeof (name), vl);
1037   if (status != 0)
1038   {
1039     ERROR ("utils_cache: uc_get_meta: FORMAT_VL failed.");
1040     return (NULL);
1041   }
1042
1043   pthread_mutex_lock (&cache_lock);
1044
1045   status = c_avl_get (cache_tree, name, (void *) &ce);
1046   if (status != 0)
1047   {
1048     pthread_mutex_unlock (&cache_lock);
1049     return (NULL);
1050   }
1051   assert (ce != NULL);
1052
1053   if (ce->meta == NULL)
1054     ce->meta = meta_data_create ();
1055
1056   if (ce->meta == NULL)
1057     pthread_mutex_unlock (&cache_lock);
1058
1059   return (ce->meta);
1060 } /* }}} meta_data_t *uc_get_meta */
1061
1062 /* Sorry about this preprocessor magic, but it really makes this file much
1063  * shorter.. */
1064 #define UC_WRAP(wrap_function) { \
1065   meta_data_t *meta; \
1066   int status; \
1067   meta = uc_get_meta (vl); \
1068   if (meta == NULL) return (-1); \
1069   status = wrap_function (meta, key); \
1070   pthread_mutex_unlock (&cache_lock); \
1071   return (status); \
1072 }
1073 int uc_meta_data_exists (const value_list_t *vl, const char *key)
1074   UC_WRAP (meta_data_exists)
1075
1076 int uc_meta_data_delete (const value_list_t *vl, const char *key)
1077   UC_WRAP (meta_data_delete)
1078 #undef UC_WRAP
1079
1080 /* We need a new version of this macro because the following functions take
1081  * two argumetns. */
1082 #define UC_WRAP(wrap_function) { \
1083   meta_data_t *meta; \
1084   int status; \
1085   meta = uc_get_meta (vl); \
1086   if (meta == NULL) return (-1); \
1087   status = wrap_function (meta, key, value); \
1088   pthread_mutex_unlock (&cache_lock); \
1089   return (status); \
1090 }
1091 int uc_meta_data_add_string (const value_list_t *vl,
1092     const char *key,
1093     const char *value)
1094   UC_WRAP(meta_data_add_string)
1095 int uc_meta_data_add_signed_int (const value_list_t *vl,
1096     const char *key,
1097     int64_t value)
1098   UC_WRAP(meta_data_add_signed_int)
1099 int uc_meta_data_add_unsigned_int (const value_list_t *vl,
1100     const char *key,
1101     uint64_t value)
1102   UC_WRAP(meta_data_add_unsigned_int)
1103 int uc_meta_data_add_double (const value_list_t *vl,
1104     const char *key,
1105     double value)
1106   UC_WRAP(meta_data_add_double)
1107 int uc_meta_data_add_boolean (const value_list_t *vl,
1108     const char *key,
1109     _Bool value)
1110   UC_WRAP(meta_data_add_boolean)
1111
1112 int uc_meta_data_get_string (const value_list_t *vl,
1113     const char *key,
1114     char **value)
1115   UC_WRAP(meta_data_get_string)
1116 int uc_meta_data_get_signed_int (const value_list_t *vl,
1117     const char *key,
1118     int64_t *value)
1119   UC_WRAP(meta_data_get_signed_int)
1120 int uc_meta_data_get_unsigned_int (const value_list_t *vl,
1121     const char *key,
1122     uint64_t *value)
1123   UC_WRAP(meta_data_get_unsigned_int)
1124 int uc_meta_data_get_double (const value_list_t *vl,
1125     const char *key,
1126     double *value)
1127   UC_WRAP(meta_data_get_double)
1128 int uc_meta_data_get_boolean (const value_list_t *vl,
1129     const char *key,
1130     _Bool *value)
1131   UC_WRAP(meta_data_get_boolean)
1132 #undef UC_WRAP
1133
1134 /* vim: set sw=2 ts=8 sts=2 tw=78 : */