src/utils_{cache,threshold}.c: Don't replace %{data_source} and %{value} …
[collectd.git] / src / utils_cache.c
1 /**
2  * collectd - src/utils_cache.c
3  * Copyright (C) 2007,2008  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Author:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "common.h"
24 #include "plugin.h"
25 #include "utils_avltree.h"
26 #include "utils_cache.h"
27 #include "utils_threshold.h"
28 #include "utils_subst.h"
29 #include "meta_data.h"
30
31 #include <assert.h>
32 #include <pthread.h>
33
34 typedef struct cache_entry_s
35 {
36         char name[6 * DATA_MAX_NAME_LEN];
37         int        values_num;
38         gauge_t   *values_gauge;
39         value_t   *values_raw;
40         /* Time contained in the package
41          * (for calculating rates) */
42         time_t last_time;
43         /* Time according to the local clock
44          * (for purging old entries) */
45         time_t last_update;
46         /* Interval in which the data is collected
47          * (for purding old entries) */
48         int interval;
49         int state;
50         int hits;
51
52         /*
53          * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
54          * !  0  !  1  !  2  !  3  !  4  !  5  !  6  !  7  !  8  ! ...
55          * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
56          * ! ds0 ! ds1 ! ds2 ! ds0 ! ds1 ! ds2 ! ds0 ! ds1 ! ds2 ! ...
57          * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
58          * !      t = 0      !      t = 1      !      t = 2      ! ...
59          * +-----------------+-----------------+-----------------+----
60          */
61         gauge_t *history;
62         size_t   history_index; /* points to the next position to write to. */
63         size_t   history_length;
64
65         meta_data_t *meta;
66 } cache_entry_t;
67
68 static c_avl_tree_t   *cache_tree = NULL;
69 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
70
71 static int cache_compare (const cache_entry_t *a, const cache_entry_t *b)
72 {
73   assert ((a != NULL) && (b != NULL));
74   return (strcmp (a->name, b->name));
75 } /* int cache_compare */
76
77 static cache_entry_t *cache_alloc (int values_num)
78 {
79   cache_entry_t *ce;
80
81   ce = (cache_entry_t *) malloc (sizeof (cache_entry_t));
82   if (ce == NULL)
83   {
84     ERROR ("utils_cache: cache_alloc: malloc failed.");
85     return (NULL);
86   }
87   memset (ce, '\0', sizeof (cache_entry_t));
88   ce->values_num = values_num;
89
90   ce->values_gauge = calloc (values_num, sizeof (*ce->values_gauge));
91   ce->values_raw   = calloc (values_num, sizeof (*ce->values_raw));
92   if ((ce->values_gauge == NULL) || (ce->values_raw == NULL))
93   {
94     sfree (ce->values_gauge);
95     sfree (ce->values_raw);
96     sfree (ce);
97     ERROR ("utils_cache: cache_alloc: calloc failed.");
98     return (NULL);
99   }
100
101   ce->history = NULL;
102   ce->history_length = 0;
103   ce->meta = NULL;
104
105   return (ce);
106 } /* cache_entry_t *cache_alloc */
107
108 static void cache_free (cache_entry_t *ce)
109 {
110   if (ce == NULL)
111     return;
112
113   sfree (ce->values_gauge);
114   sfree (ce->values_raw);
115   sfree (ce->history);
116   if (ce->meta != NULL)
117   {
118     meta_data_destroy (ce->meta);
119     ce->meta = NULL;
120   }
121   sfree (ce);
122 } /* void cache_free */
123
124 /* Send out notifications for "missing" values. */
125 static int uc_send_notification (const char *name)
126 {
127   cache_entry_t *ce = NULL;
128   int status;
129
130   char *name_copy;
131   char *host;
132   char *plugin;
133   char *plugin_instance;
134   char *type;
135   char *type_instance;
136   threshold_t th;
137
138   notification_t n;
139
140   data_set_t ds;
141   value_list_t vl;
142
143   name_copy = strdup (name);
144   if (name_copy == NULL)
145   {
146     ERROR ("uc_send_notification: strdup failed.");
147     return (-1);
148   }
149
150   status = parse_identifier (name_copy, &host,
151       &plugin, &plugin_instance,
152       &type, &type_instance);
153   if (status != 0)
154   {
155     ERROR ("uc_send_notification: Cannot parse name `%s'", name);
156     return (-1);
157   }
158
159   memset (&ds, 0, sizeof (ds));
160   memset (&vl, 0, sizeof (vl));
161
162   sstrncpy (vl.host, host, sizeof (vl.host));
163   sstrncpy (vl.plugin, plugin, sizeof (vl.plugin));
164   if (plugin_instance != NULL)
165     sstrncpy (vl.plugin_instance, plugin_instance, sizeof (vl.plugin_instance));
166   sstrncpy (ds.type, type, sizeof (ds.type));
167   sstrncpy (vl.type, type, sizeof (vl.type));
168   if (type_instance != NULL)
169     sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance));
170
171   /* Copy the associative members */
172   notification_init (&n, NOTIF_FAILURE, /* host = */ NULL,
173       host, plugin, plugin_instance, type, type_instance);
174
175   sfree (name_copy);
176   name_copy = host = plugin = plugin_instance = type = type_instance = NULL;
177
178
179   pthread_mutex_lock (&cache_lock);
180
181   /*
182    * Set the time _after_ getting the lock because we don't know how long
183    * acquiring the lock takes and we will use this time later to decide
184    * whether or not the state is OKAY.
185    */
186   n.time = time (NULL);
187
188   status = c_avl_get (cache_tree, name, (void *) &ce);
189   if (status != 0)
190   {
191     pthread_mutex_unlock (&cache_lock);
192     sfree (name_copy);
193     return (-1);
194   }
195     
196   /* Check if the entry has been updated since we marked it as "missing" in
197    * uc_check_timeout() */
198   if ((n.time - ce->last_update) < (2 * ce->interval))
199   {
200     ce->state = STATE_OKAY;
201     pthread_mutex_unlock (&cache_lock);
202     sfree (name_copy);
203     return (-1);
204   }
205
206   /* if the associated threshold has a missing message, then use custom
207    * message. FIXME: we do a threshold_search here and in uc_check_timeout
208    * (calling ut_check_interesting, but we really need to do this once */
209   status = ut_search_threshold (&vl, &th);
210   while ((status == 0) && (th.missing_message != NULL))
211   {
212     char msg[NOTIF_MAX_MSG_LEN];
213     char temp[NOTIF_MAX_MSG_LEN];
214     char missing_str[32] = "";
215
216     sstrncpy (msg, th.missing_message, sizeof (msg));
217     status = ut_build_message (msg, sizeof (msg),
218         /* format = */ th.missing_message,
219         &ds, /* ds index = */ -1,
220         &vl, ce->values_gauge,
221         &n, &th);
222     if (status != 0)
223       break;
224
225     (void) ssnprintf (missing_str, sizeof (missing_str), "%li",
226         (long) (n.time - ce->last_update));
227
228     if (subst_string (temp, sizeof (temp), msg,
229           "%{missing}", missing_str) != NULL)
230       sstrncpy (msg, temp, sizeof (msg));
231
232     sstrncpy (n.message, msg, sizeof (n.message));
233     break;
234   }
235
236   /* "ut_search_threshold" returned an error, there is no "missing_message" or
237    * "ut_build_message" failed. Use the generic default. */
238   if ((status != 0) || (th.missing_message == NULL))
239   {
240     ssnprintf (n.message, sizeof (n.message),
241         "\"%s\" has not been updated for %li seconds.", name,
242         (long) (n.time - ce->last_update));
243   }
244
245   pthread_mutex_unlock (&cache_lock);
246
247   plugin_dispatch_notification (&n);
248
249   return (0);
250 } /* int uc_send_notification */
251
252 static void uc_check_range (const data_set_t *ds, cache_entry_t *ce)
253 {
254   int i;
255
256   for (i = 0; i < ds->ds_num; i++)
257   {
258     if (isnan (ce->values_gauge[i]))
259       continue;
260     else if (ce->values_gauge[i] < ds->ds[i].min)
261       ce->values_gauge[i] = NAN;
262     else if (ce->values_gauge[i] > ds->ds[i].max)
263       ce->values_gauge[i] = NAN;
264   }
265 } /* void uc_check_range */
266
267 static int uc_insert (const data_set_t *ds, const value_list_t *vl,
268     const char *key)
269 {
270   int i;
271   char *key_copy;
272   cache_entry_t *ce;
273
274   /* `cache_lock' has been locked by `uc_update' */
275
276   key_copy = strdup (key);
277   if (key_copy == NULL)
278   {
279     ERROR ("uc_insert: strdup failed.");
280     return (-1);
281   }
282
283   ce = cache_alloc (ds->ds_num);
284   if (ce == NULL)
285   {
286     sfree (key_copy);
287     ERROR ("uc_insert: cache_alloc (%i) failed.", ds->ds_num);
288     return (-1);
289   }
290
291   sstrncpy (ce->name, key, sizeof (ce->name));
292
293   for (i = 0; i < ds->ds_num; i++)
294   {
295     switch (ds->ds[i].type)
296     {
297       case DS_TYPE_COUNTER:
298         ce->values_gauge[i] = NAN;
299         ce->values_raw[i].counter = vl->values[i].counter;
300         break;
301
302       case DS_TYPE_GAUGE:
303         ce->values_gauge[i] = vl->values[i].gauge;
304         ce->values_raw[i].gauge = vl->values[i].gauge;
305         break;
306
307       case DS_TYPE_DERIVE:
308         ce->values_gauge[i] = NAN;
309         ce->values_raw[i].derive = vl->values[i].derive;
310         break;
311
312       case DS_TYPE_ABSOLUTE:
313         ce->values_gauge[i] = NAN;
314         if (vl->interval > 0)
315           ce->values_gauge[i] = ((double) vl->values[i].absolute)
316             / ((double) vl->interval);
317         ce->values_raw[i].absolute = vl->values[i].absolute;
318         break;
319         
320       default:
321         /* This shouldn't happen. */
322         ERROR ("uc_insert: Don't know how to handle data source type %i.",
323             ds->ds[i].type);
324         return (-1);
325     } /* switch (ds->ds[i].type) */
326   } /* for (i) */
327
328   /* Prune invalid gauge data */
329   uc_check_range (ds, ce);
330
331   ce->last_time = vl->time;
332   ce->last_update = time (NULL);
333   ce->interval = vl->interval;
334   ce->state = STATE_OKAY;
335
336   if (c_avl_insert (cache_tree, key_copy, ce) != 0)
337   {
338     sfree (key_copy);
339     ERROR ("uc_insert: c_avl_insert failed.");
340     return (-1);
341   }
342
343   DEBUG ("uc_insert: Added %s to the cache.", key);
344   return (0);
345 } /* int uc_insert */
346
347 int uc_init (void)
348 {
349   if (cache_tree == NULL)
350     cache_tree = c_avl_create ((int (*) (const void *, const void *))
351         cache_compare);
352
353   return (0);
354 } /* int uc_init */
355
356 int uc_check_timeout (void)
357 {
358   time_t now;
359   cache_entry_t *ce;
360
361   char **keys = NULL;
362   int keys_len = 0;
363
364   char *key;
365   c_avl_iterator_t *iter;
366   int i;
367   
368   pthread_mutex_lock (&cache_lock);
369
370   now = time (NULL);
371
372   /* Build a list of entries to be flushed */
373   iter = c_avl_get_iterator (cache_tree);
374   while (c_avl_iterator_next (iter, (void *) &key, (void *) &ce) == 0)
375   {
376     /* If entry has not been updated, add to `keys' array */
377     if ((now - ce->last_update) >= (timeout_g * ce->interval))
378     {
379       char **tmp;
380
381       tmp = (char **) realloc ((void *) keys,
382           (keys_len + 1) * sizeof (char *));
383       if (tmp == NULL)
384       {
385         ERROR ("uc_check_timeout: realloc failed.");
386         c_avl_iterator_destroy (iter);
387         sfree (keys);
388         pthread_mutex_unlock (&cache_lock);
389         return (-1);
390       }
391
392       keys = tmp;
393       keys[keys_len] = strdup (key);
394       if (keys[keys_len] == NULL)
395       {
396         ERROR ("uc_check_timeout: strdup failed.");
397         continue;
398       }
399       keys_len++;
400     }
401   } /* while (c_avl_iterator_next) */
402
403   ce = NULL;
404
405   for (i = 0; i < keys_len; i++)
406   {
407     int status;
408
409     status = ut_check_interesting (keys[i]);
410
411     if (status < 0)
412     {
413       ERROR ("uc_check_timeout: ut_check_interesting failed.");
414       sfree (keys[i]);
415       continue;
416     }
417     else if (status == 0) /* ``service'' is uninteresting */
418     {
419       DEBUG ("uc_check_timeout: %s is missing but ``uninteresting''",
420           keys[i]);
421       ce = NULL;
422       status = c_avl_remove (cache_tree, keys[i],
423           (void *) &key, (void *) &ce);
424       if (status != 0)
425       {
426         ERROR ("uc_check_timeout: c_avl_remove (%s) failed.", keys[i]);
427       }
428       sfree (keys[i]);
429       sfree (key);
430       if (ce != NULL)
431         cache_free (ce);
432       continue;
433     }
434
435     /* If we get here, the value is ``interesting''. Query the record from the
436      * cache and update the state field. */
437     if (c_avl_get (cache_tree, keys[i], (void *) &ce) != 0)
438     {
439       ERROR ("uc_check_timeout: cannot get data for %s from cache", keys[i]);
440       /* Do not free `keys[i]' so a notification is sent further down. */
441       continue;
442     }
443     assert (ce != NULL);
444
445     if (status == 2) /* persist */
446     {
447       DEBUG ("uc_check_timeout: %s is missing, sending notification.",
448           keys[i]);
449       ce->state = STATE_MISSING;
450       /* Do not free `keys[i]' so a notification is sent further down. */
451     }
452     else if (status == 1) /* do not persist */
453     {
454       if (ce->state == STATE_MISSING)
455       {
456         DEBUG ("uc_check_timeout: %s is missing but "
457             "notification has already been sent.",
458             keys[i]);
459         /* Set `keys[i]' to NULL to no notification is sent. */
460         sfree (keys[i]);
461       }
462       else /* (ce->state != STATE_MISSING) */
463       {
464         DEBUG ("uc_check_timeout: %s is missing, sending one notification.",
465             keys[i]);
466         ce->state = STATE_MISSING;
467         /* Do not free `keys[i]' so a notification is sent further down. */
468       }
469     }
470     else
471     {
472       WARNING ("uc_check_timeout: ut_check_interesting (%s) returned "
473           "invalid status %i.",
474           keys[i], status);
475       sfree (keys[i]);
476     }
477
478     /* Make really sure the next iteration doesn't work with this pointer.
479      * There have been too many bugs in the past.. :/  -- octo */
480     ce = NULL;
481   } /* for (keys[i]) */
482
483   c_avl_iterator_destroy (iter);
484
485   pthread_mutex_unlock (&cache_lock);
486
487   for (i = 0; i < keys_len; i++)
488   {
489     if (keys[i] == NULL)
490       continue;
491
492     uc_send_notification (keys[i]);
493     sfree (keys[i]);
494   }
495
496   sfree (keys);
497
498   return (0);
499 } /* int uc_check_timeout */
500
501 int uc_update (const data_set_t *ds, const value_list_t *vl)
502 {
503   char name[6 * DATA_MAX_NAME_LEN];
504   cache_entry_t *ce = NULL;
505   int send_okay_notification = 0;
506   time_t update_delay = 0;
507   notification_t n;
508   int status;
509   int i;
510
511   if (FORMAT_VL (name, sizeof (name), vl) != 0)
512   {
513     ERROR ("uc_update: FORMAT_VL failed.");
514     return (-1);
515   }
516
517   pthread_mutex_lock (&cache_lock);
518
519   status = c_avl_get (cache_tree, name, (void *) &ce);
520   if (status != 0) /* entry does not yet exist */
521   {
522     status = uc_insert (ds, vl, name);
523     pthread_mutex_unlock (&cache_lock);
524     return (status);
525   }
526
527   assert (ce != NULL);
528   assert (ce->values_num == ds->ds_num);
529
530   if (ce->last_time >= vl->time)
531   {
532     pthread_mutex_unlock (&cache_lock);
533     NOTICE ("uc_update: Value too old: name = %s; value time = %u; "
534         "last cache update = %u;",
535         name, (unsigned int) vl->time, (unsigned int) ce->last_time);
536     return (-1);
537   }
538
539   /* Send a notification (after the lock has been released) if we switch the
540    * state from something else to `okay'. */
541   if (ce->state == STATE_MISSING)
542   {
543     send_okay_notification = 1;
544     ce->state = STATE_OKAY;
545     update_delay = time (NULL) - ce->last_update;
546   }
547
548   for (i = 0; i < ds->ds_num; i++)
549   {
550     switch (ds->ds[i].type)
551     {
552       case DS_TYPE_COUNTER:
553         {
554           counter_t diff;
555
556           /* check if the counter has wrapped around */
557           if (vl->values[i].counter < ce->values_raw[i].counter)
558           {
559             if (ce->values_raw[i].counter <= 4294967295U)
560               diff = (4294967295U - ce->values_raw[i].counter)
561                 + vl->values[i].counter;
562             else
563               diff = (18446744073709551615ULL - ce->values_raw[i].counter)
564                 + vl->values[i].counter;
565           }
566           else /* counter has NOT wrapped around */
567           {
568             diff = vl->values[i].counter - ce->values_raw[i].counter;
569           }
570
571           ce->values_gauge[i] = ((double) diff)
572             / ((double) (vl->time - ce->last_time));
573           ce->values_raw[i].counter = vl->values[i].counter;
574         }
575         break;
576
577       case DS_TYPE_GAUGE:
578         ce->values_raw[i].gauge = vl->values[i].gauge;
579         ce->values_gauge[i] = vl->values[i].gauge;
580         break;
581
582       case DS_TYPE_DERIVE:
583         {
584           derive_t diff;
585
586           diff = vl->values[i].derive - ce->values_raw[i].derive;
587
588           ce->values_gauge[i] = ((double) diff)
589             / ((double) (vl->time - ce->last_time));
590           ce->values_raw[i].derive = vl->values[i].derive;
591         }
592         break;
593
594       case DS_TYPE_ABSOLUTE:
595         ce->values_gauge[i] = ((double) vl->values[i].absolute)
596           / ((double) (vl->time - ce->last_time));
597         ce->values_raw[i].absolute = vl->values[i].absolute;
598         break;
599
600       default:
601         /* This shouldn't happen. */
602         pthread_mutex_unlock (&cache_lock);
603         ERROR ("uc_update: Don't know how to handle data source type %i.",
604             ds->ds[i].type);
605         return (-1);
606     } /* switch (ds->ds[i].type) */
607
608     DEBUG ("uc_update: %s: ds[%i] = %lf", name, i, ce->values_gauge[i]);
609   } /* for (i) */
610
611   /* Update the history if it exists. */
612   if (ce->history != NULL)
613   {
614     assert (ce->history_index < ce->history_length);
615     for (i = 0; i < ce->values_num; i++)
616     {
617       size_t hist_idx = (ce->values_num * ce->history_index) + i;
618       ce->history[hist_idx] = ce->values_gauge[i];
619     }
620
621     assert (ce->history_length > 0);
622     ce->history_index = (ce->history_index + 1) % ce->history_length;
623   }
624
625   /* Prune invalid gauge data */
626   uc_check_range (ds, ce);
627
628   ce->last_time = vl->time;
629   ce->last_update = time (NULL);
630   ce->interval = vl->interval;
631
632   pthread_mutex_unlock (&cache_lock);
633
634   if (send_okay_notification == 0)
635     return (0);
636
637   /* Do not send okay notifications for uninteresting values, i. e. values for
638    * which no threshold is configured. */
639   status = ut_check_interesting (name);
640   if (status <= 0)
641     return (0);
642
643   /* Initialize the notification */
644   memset (&n, '\0', sizeof (n));
645   NOTIFICATION_INIT_VL (&n, vl, ds);
646
647   n.severity = NOTIF_OKAY;
648   n.time = vl->time;
649
650   ssnprintf (n.message, sizeof (n.message),
651       "Received a value for %s. It was missing for %u seconds.",
652       name, (unsigned int) update_delay);
653
654   plugin_dispatch_notification (&n);
655
656   return (0);
657 } /* int uc_update */
658
659 int uc_get_rate_by_name (const char *name, gauge_t **ret_values, size_t *ret_values_num)
660 {
661   gauge_t *ret = NULL;
662   size_t ret_num = 0;
663   cache_entry_t *ce = NULL;
664   int status = 0;
665
666   pthread_mutex_lock (&cache_lock);
667
668   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
669   {
670     assert (ce != NULL);
671
672     /* remove missing values from getval */
673     if (ce->state == STATE_MISSING)
674     {
675       status = -1;
676     }
677     else
678     {
679       ret_num = ce->values_num;
680       ret = (gauge_t *) malloc (ret_num * sizeof (gauge_t));
681       if (ret == NULL)
682       {
683         ERROR ("utils_cache: uc_get_rate_by_name: malloc failed.");
684         status = -1;
685       }
686       else
687       {
688         memcpy (ret, ce->values_gauge, ret_num * sizeof (gauge_t));
689       }
690     }
691   }
692   else
693   {
694     DEBUG ("utils_cache: uc_get_rate_by_name: No such value: %s", name);
695     status = -1;
696   }
697
698   pthread_mutex_unlock (&cache_lock);
699
700   if (status == 0)
701   {
702     *ret_values = ret;
703     *ret_values_num = ret_num;
704   }
705
706   return (status);
707 } /* gauge_t *uc_get_rate_by_name */
708
709 gauge_t *uc_get_rate (const data_set_t *ds, const value_list_t *vl)
710 {
711   char name[6 * DATA_MAX_NAME_LEN];
712   gauge_t *ret = NULL;
713   size_t ret_num = 0;
714   int status;
715
716   if (FORMAT_VL (name, sizeof (name), vl) != 0)
717   {
718     ERROR ("utils_cache: uc_get_rate: FORMAT_VL failed.");
719     return (NULL);
720   }
721
722   status = uc_get_rate_by_name (name, &ret, &ret_num);
723   if (status != 0)
724     return (NULL);
725
726   /* This is important - the caller has no other way of knowing how many
727    * values are returned. */
728   if (ret_num != (size_t) ds->ds_num)
729   {
730     ERROR ("utils_cache: uc_get_rate: ds[%s] has %i values, "
731         "but uc_get_rate_by_name returned %zu.",
732         ds->type, ds->ds_num, ret_num);
733     sfree (ret);
734     return (NULL);
735   }
736
737   return (ret);
738 } /* gauge_t *uc_get_rate */
739
740 int uc_get_names (char ***ret_names, time_t **ret_times, size_t *ret_number)
741 {
742   c_avl_iterator_t *iter;
743   char *key;
744   cache_entry_t *value;
745
746   char **names = NULL;
747   time_t *times = NULL;
748   size_t number = 0;
749
750   int status = 0;
751
752   if ((ret_names == NULL) || (ret_number == NULL))
753     return (-1);
754
755   pthread_mutex_lock (&cache_lock);
756
757   iter = c_avl_get_iterator (cache_tree);
758   while (c_avl_iterator_next (iter, (void *) &key, (void *) &value) == 0)
759   {
760     char **temp;
761
762     /* remove missing values when list values */
763     if (value->state == STATE_MISSING)
764       continue;
765
766     if (ret_times != NULL)
767     {
768       time_t *tmp_times;
769
770       tmp_times = (time_t *) realloc (times, sizeof (time_t) * (number + 1));
771       if (tmp_times == NULL)
772       {
773         status = -1;
774         break;
775       }
776       times = tmp_times;
777       times[number] = value->last_time;
778     }
779
780     temp = (char **) realloc (names, sizeof (char *) * (number + 1));
781     if (temp == NULL)
782     {
783       status = -1;
784       break;
785     }
786     names = temp;
787     names[number] = strdup (key);
788     if (names[number] == NULL)
789     {
790       status = -1;
791       break;
792     }
793     number++;
794   } /* while (c_avl_iterator_next) */
795
796   c_avl_iterator_destroy (iter);
797   pthread_mutex_unlock (&cache_lock);
798
799   if (status != 0)
800   {
801     size_t i;
802     
803     for (i = 0; i < number; i++)
804     {
805       sfree (names[i]);
806     }
807     sfree (names);
808
809     return (-1);
810   }
811
812   *ret_names = names;
813   if (ret_times != NULL)
814     *ret_times = times;
815   *ret_number = number;
816
817   return (0);
818 } /* int uc_get_names */
819
820 int uc_get_state (const data_set_t *ds, const value_list_t *vl)
821 {
822   char name[6 * DATA_MAX_NAME_LEN];
823   cache_entry_t *ce = NULL;
824   int ret = STATE_ERROR;
825
826   if (FORMAT_VL (name, sizeof (name), vl) != 0)
827   {
828     ERROR ("uc_get_state: FORMAT_VL failed.");
829     return (STATE_ERROR);
830   }
831
832   pthread_mutex_lock (&cache_lock);
833
834   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
835   {
836     assert (ce != NULL);
837     ret = ce->state;
838   }
839
840   pthread_mutex_unlock (&cache_lock);
841
842   return (ret);
843 } /* int uc_get_state */
844
845 int uc_set_state (const data_set_t *ds, const value_list_t *vl, int state)
846 {
847   char name[6 * DATA_MAX_NAME_LEN];
848   cache_entry_t *ce = NULL;
849   int ret = -1;
850
851   if (FORMAT_VL (name, sizeof (name), vl) != 0)
852   {
853     ERROR ("uc_get_state: FORMAT_VL failed.");
854     return (STATE_ERROR);
855   }
856
857   pthread_mutex_lock (&cache_lock);
858
859   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
860   {
861     assert (ce != NULL);
862     ret = ce->state;
863     ce->state = state;
864   }
865
866   pthread_mutex_unlock (&cache_lock);
867
868   return (ret);
869 } /* int uc_set_state */
870
871 int uc_get_history_by_name (const char *name,
872     gauge_t *ret_history, size_t num_steps, size_t num_ds)
873 {
874   cache_entry_t *ce = NULL;
875   size_t i;
876   int status = 0;
877
878   pthread_mutex_lock (&cache_lock);
879
880   status = c_avl_get (cache_tree, name, (void *) &ce);
881   if (status != 0)
882   {
883     pthread_mutex_unlock (&cache_lock);
884     return (-ENOENT);
885   }
886
887   if (((size_t) ce->values_num) != num_ds)
888   {
889     pthread_mutex_unlock (&cache_lock);
890     return (-EINVAL);
891   }
892
893   /* Check if there are enough values available. If not, increase the buffer
894    * size. */
895   if (ce->history_length < num_steps)
896   {
897     gauge_t *tmp;
898     size_t i;
899
900     tmp = realloc (ce->history, sizeof (*ce->history)
901         * num_steps * ce->values_num);
902     if (tmp == NULL)
903     {
904       pthread_mutex_unlock (&cache_lock);
905       return (-ENOMEM);
906     }
907
908     for (i = ce->history_length * ce->values_num;
909         i < (num_steps * ce->values_num);
910         i++)
911       tmp[i] = NAN;
912
913     ce->history = tmp;
914     ce->history_length = num_steps;
915   } /* if (ce->history_length < num_steps) */
916
917   /* Copy the values to the output buffer. */
918   for (i = 0; i < num_steps; i++)
919   {
920     size_t src_index;
921     size_t dst_index;
922
923     if (i < ce->history_index)
924       src_index = ce->history_index - (i + 1);
925     else
926       src_index = ce->history_length + ce->history_index - (i + 1);
927     src_index = src_index * num_ds;
928
929     dst_index = i * num_ds;
930
931     memcpy (ret_history + dst_index, ce->history + src_index,
932         sizeof (*ret_history) * num_ds);
933   }
934
935   pthread_mutex_unlock (&cache_lock);
936
937   return (0);
938 } /* int uc_get_history_by_name */
939
940 int uc_get_history (const data_set_t *ds, const value_list_t *vl,
941     gauge_t *ret_history, size_t num_steps, size_t num_ds)
942 {
943   char name[6 * DATA_MAX_NAME_LEN];
944
945   if (FORMAT_VL (name, sizeof (name), vl) != 0)
946   {
947     ERROR ("utils_cache: uc_get_history: FORMAT_VL failed.");
948     return (-1);
949   }
950
951   return (uc_get_history_by_name (name, ret_history, num_steps, num_ds));
952 } /* int uc_get_history */
953
954 int uc_get_hits (const data_set_t *ds, const value_list_t *vl)
955 {
956   char name[6 * DATA_MAX_NAME_LEN];
957   cache_entry_t *ce = NULL;
958   int ret = STATE_ERROR;
959
960   if (FORMAT_VL (name, sizeof (name), vl) != 0)
961   {
962     ERROR ("uc_get_state: FORMAT_VL failed.");
963     return (STATE_ERROR);
964   }
965
966   pthread_mutex_lock (&cache_lock);
967
968   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
969   {
970     assert (ce != NULL);
971     ret = ce->hits;
972   }
973
974   pthread_mutex_unlock (&cache_lock);
975
976   return (ret);
977 } /* int uc_get_hits */
978
979 int uc_set_hits (const data_set_t *ds, const value_list_t *vl, int hits)
980 {
981   char name[6 * DATA_MAX_NAME_LEN];
982   cache_entry_t *ce = NULL;
983   int ret = -1;
984
985   if (FORMAT_VL (name, sizeof (name), vl) != 0)
986   {
987     ERROR ("uc_get_state: FORMAT_VL failed.");
988     return (STATE_ERROR);
989   }
990
991   pthread_mutex_lock (&cache_lock);
992
993   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
994   {
995     assert (ce != NULL);
996     ret = ce->hits;
997     ce->hits = hits;
998   }
999
1000   pthread_mutex_unlock (&cache_lock);
1001
1002   return (ret);
1003 } /* int uc_set_hits */
1004
1005 int uc_inc_hits (const data_set_t *ds, const value_list_t *vl, int step)
1006 {
1007   char name[6 * DATA_MAX_NAME_LEN];
1008   cache_entry_t *ce = NULL;
1009   int ret = -1;
1010
1011   if (FORMAT_VL (name, sizeof (name), vl) != 0)
1012   {
1013     ERROR ("uc_get_state: FORMAT_VL failed.");
1014     return (STATE_ERROR);
1015   }
1016
1017   pthread_mutex_lock (&cache_lock);
1018
1019   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
1020   {
1021     assert (ce != NULL);
1022     ret = ce->hits;
1023     ce->hits = ret + step;
1024   }
1025
1026   pthread_mutex_unlock (&cache_lock);
1027
1028   return (ret);
1029 } /* int uc_inc_hits */
1030
1031 /*
1032  * Meta data interface
1033  */
1034 /* XXX: This function will acquire `cache_lock' but will not free it! */
1035 static meta_data_t *uc_get_meta (const value_list_t *vl) /* {{{ */
1036 {
1037   char name[6 * DATA_MAX_NAME_LEN];
1038   cache_entry_t *ce = NULL;
1039   int status;
1040
1041   status = FORMAT_VL (name, sizeof (name), vl);
1042   if (status != 0)
1043   {
1044     ERROR ("utils_cache: uc_get_meta: FORMAT_VL failed.");
1045     return (NULL);
1046   }
1047
1048   pthread_mutex_lock (&cache_lock);
1049
1050   status = c_avl_get (cache_tree, name, (void *) &ce);
1051   if (status != 0)
1052   {
1053     pthread_mutex_unlock (&cache_lock);
1054     return (NULL);
1055   }
1056   assert (ce != NULL);
1057
1058   if (ce->meta == NULL)
1059     ce->meta = meta_data_create ();
1060
1061   if (ce->meta == NULL)
1062     pthread_mutex_unlock (&cache_lock);
1063
1064   return (ce->meta);
1065 } /* }}} meta_data_t *uc_get_meta */
1066
1067 /* Sorry about this preprocessor magic, but it really makes this file much
1068  * shorter.. */
1069 #define UC_WRAP(wrap_function) { \
1070   meta_data_t *meta; \
1071   int status; \
1072   meta = uc_get_meta (vl); \
1073   if (meta == NULL) return (-1); \
1074   status = wrap_function (meta, key); \
1075   pthread_mutex_unlock (&cache_lock); \
1076   return (status); \
1077 }
1078 int uc_meta_data_exists (const value_list_t *vl, const char *key)
1079   UC_WRAP (meta_data_exists)
1080
1081 int uc_meta_data_delete (const value_list_t *vl, const char *key)
1082   UC_WRAP (meta_data_delete)
1083 #undef UC_WRAP
1084
1085 /* We need a new version of this macro because the following functions take
1086  * two argumetns. */
1087 #define UC_WRAP(wrap_function) { \
1088   meta_data_t *meta; \
1089   int status; \
1090   meta = uc_get_meta (vl); \
1091   if (meta == NULL) return (-1); \
1092   status = wrap_function (meta, key, value); \
1093   pthread_mutex_unlock (&cache_lock); \
1094   return (status); \
1095 }
1096 int uc_meta_data_add_string (const value_list_t *vl,
1097     const char *key,
1098     const char *value)
1099   UC_WRAP(meta_data_add_string)
1100 int uc_meta_data_add_signed_int (const value_list_t *vl,
1101     const char *key,
1102     int64_t value)
1103   UC_WRAP(meta_data_add_signed_int)
1104 int uc_meta_data_add_unsigned_int (const value_list_t *vl,
1105     const char *key,
1106     uint64_t value)
1107   UC_WRAP(meta_data_add_unsigned_int)
1108 int uc_meta_data_add_double (const value_list_t *vl,
1109     const char *key,
1110     double value)
1111   UC_WRAP(meta_data_add_double)
1112 int uc_meta_data_add_boolean (const value_list_t *vl,
1113     const char *key,
1114     _Bool value)
1115   UC_WRAP(meta_data_add_boolean)
1116
1117 int uc_meta_data_get_string (const value_list_t *vl,
1118     const char *key,
1119     char **value)
1120   UC_WRAP(meta_data_get_string)
1121 int uc_meta_data_get_signed_int (const value_list_t *vl,
1122     const char *key,
1123     int64_t *value)
1124   UC_WRAP(meta_data_get_signed_int)
1125 int uc_meta_data_get_unsigned_int (const value_list_t *vl,
1126     const char *key,
1127     uint64_t *value)
1128   UC_WRAP(meta_data_get_unsigned_int)
1129 int uc_meta_data_get_double (const value_list_t *vl,
1130     const char *key,
1131     double *value)
1132   UC_WRAP(meta_data_get_double)
1133 int uc_meta_data_get_boolean (const value_list_t *vl,
1134     const char *key,
1135     _Bool *value)
1136   UC_WRAP(meta_data_get_boolean)
1137 #undef UC_WRAP
1138
1139 /* vim: set sw=2 ts=8 sts=2 tw=78 : */