src/utils_cache.[ch]: Make the `value_list_t *' const.
[collectd.git] / src / utils_cache.c
1 /**
2  * collectd - src/utils_cache.c
3  * Copyright (C) 2007,2008  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Author:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "common.h"
24 #include "plugin.h"
25 #include "utils_avltree.h"
26 #include "utils_cache.h"
27 #include "utils_threshold.h"
28 #include "meta_data.h"
29
30 #include <assert.h>
31 #include <pthread.h>
32
33 typedef struct cache_entry_s
34 {
35         char name[6 * DATA_MAX_NAME_LEN];
36         int        values_num;
37         gauge_t   *values_gauge;
38         value_t   *values_raw;
39         /* Time contained in the package
40          * (for calculating rates) */
41         time_t last_time;
42         /* Time according to the local clock
43          * (for purging old entries) */
44         time_t last_update;
45         /* Interval in which the data is collected
46          * (for purding old entries) */
47         int interval;
48         int state;
49
50         /*
51          * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
52          * !  0  !  1  !  2  !  3  !  4  !  5  !  6  !  7  !  8  ! ...
53          * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
54          * ! ds0 ! ds1 ! ds2 ! ds0 ! ds1 ! ds2 ! ds0 ! ds1 ! ds2 ! ...
55          * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
56          * !      t = 0      !      t = 1      !      t = 2      ! ...
57          * +-----------------+-----------------+-----------------+----
58          */
59         gauge_t *history;
60         size_t   history_index; /* points to the next position to write to. */
61         size_t   history_length;
62
63         meta_data_t *meta;
64 } cache_entry_t;
65
66 static c_avl_tree_t   *cache_tree = NULL;
67 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
68
69 static int cache_compare (const cache_entry_t *a, const cache_entry_t *b)
70 {
71   assert ((a != NULL) && (b != NULL));
72   return (strcmp (a->name, b->name));
73 } /* int cache_compare */
74
75 static cache_entry_t *cache_alloc (int values_num)
76 {
77   cache_entry_t *ce;
78
79   ce = (cache_entry_t *) malloc (sizeof (cache_entry_t));
80   if (ce == NULL)
81   {
82     ERROR ("utils_cache: cache_alloc: malloc failed.");
83     return (NULL);
84   }
85   memset (ce, '\0', sizeof (cache_entry_t));
86   ce->values_num = values_num;
87
88   ce->values_gauge = calloc (values_num, sizeof (*ce->values_gauge));
89   ce->values_raw   = calloc (values_num, sizeof (*ce->values_raw));
90   if ((ce->values_gauge == NULL) || (ce->values_raw == NULL))
91   {
92     sfree (ce->values_gauge);
93     sfree (ce->values_raw);
94     sfree (ce);
95     ERROR ("utils_cache: cache_alloc: calloc failed.");
96     return (NULL);
97   }
98
99   ce->history = NULL;
100   ce->history_length = 0;
101   ce->meta = NULL;
102
103   return (ce);
104 } /* cache_entry_t *cache_alloc */
105
106 static void cache_free (cache_entry_t *ce)
107 {
108   if (ce == NULL)
109     return;
110
111   sfree (ce->values_gauge);
112   sfree (ce->values_raw);
113   sfree (ce->history);
114   if (ce->meta != NULL)
115   {
116     meta_data_destroy (ce->meta);
117     ce->meta = NULL;
118   }
119   sfree (ce);
120 } /* void cache_free */
121
122 static int uc_send_notification (const char *name)
123 {
124   cache_entry_t *ce = NULL;
125   int status;
126
127   char *name_copy;
128   char *host;
129   char *plugin;
130   char *plugin_instance;
131   char *type;
132   char *type_instance;
133
134   notification_t n;
135
136   name_copy = strdup (name);
137   if (name_copy == NULL)
138   {
139     ERROR ("uc_send_notification: strdup failed.");
140     return (-1);
141   }
142
143   status = parse_identifier (name_copy, &host,
144       &plugin, &plugin_instance,
145       &type, &type_instance);
146   if (status != 0)
147   {
148     ERROR ("uc_send_notification: Cannot parse name `%s'", name);
149     return (-1);
150   }
151
152   /* Copy the associative members */
153   notification_init (&n, NOTIF_FAILURE, /* host = */ NULL,
154       host, plugin, plugin_instance, type, type_instance);
155
156   sfree (name_copy);
157   name_copy = host = plugin = plugin_instance = type = type_instance = NULL;
158
159   pthread_mutex_lock (&cache_lock);
160
161   /*
162    * Set the time _after_ getting the lock because we don't know how long
163    * acquiring the lock takes and we will use this time later to decide
164    * whether or not the state is OKAY.
165    */
166   n.time = time (NULL);
167
168   status = c_avl_get (cache_tree, name, (void *) &ce);
169   if (status != 0)
170   {
171     pthread_mutex_unlock (&cache_lock);
172     sfree (name_copy);
173     return (-1);
174   }
175     
176   /* Check if the entry has been updated in the meantime */
177   if ((n.time - ce->last_update) < (2 * ce->interval))
178   {
179     ce->state = STATE_OKAY;
180     pthread_mutex_unlock (&cache_lock);
181     sfree (name_copy);
182     return (-1);
183   }
184
185   ssnprintf (n.message, sizeof (n.message),
186       "%s has not been updated for %i seconds.", name,
187       (int) (n.time - ce->last_update));
188
189   pthread_mutex_unlock (&cache_lock);
190
191   plugin_dispatch_notification (&n);
192
193   return (0);
194 } /* int uc_send_notification */
195
196 static void uc_check_range (const data_set_t *ds, cache_entry_t *ce)
197 {
198   int i;
199
200   for (i = 0; i < ds->ds_num; i++)
201   {
202     if (isnan (ce->values_gauge[i]))
203       continue;
204     else if (ce->values_gauge[i] < ds->ds[i].min)
205       ce->values_gauge[i] = NAN;
206     else if (ce->values_gauge[i] > ds->ds[i].max)
207       ce->values_gauge[i] = NAN;
208   }
209 } /* void uc_check_range */
210
211 static int uc_insert (const data_set_t *ds, const value_list_t *vl,
212     const char *key)
213 {
214   int i;
215   char *key_copy;
216   cache_entry_t *ce;
217
218   /* `cache_lock' has been locked by `uc_update' */
219
220   key_copy = strdup (key);
221   if (key_copy == NULL)
222   {
223     ERROR ("uc_insert: strdup failed.");
224     return (-1);
225   }
226
227   ce = cache_alloc (ds->ds_num);
228   if (ce == NULL)
229   {
230     sfree (key_copy);
231     ERROR ("uc_insert: cache_alloc (%i) failed.", ds->ds_num);
232     return (-1);
233   }
234
235   sstrncpy (ce->name, key, sizeof (ce->name));
236
237   for (i = 0; i < ds->ds_num; i++)
238   {
239     switch (ds->ds[i].type)
240     {
241       case DS_TYPE_COUNTER:
242         ce->values_gauge[i] = NAN;
243         ce->values_raw[i].counter = vl->values[i].counter;
244         break;
245
246       case DS_TYPE_GAUGE:
247         ce->values_gauge[i] = vl->values[i].gauge;
248         ce->values_raw[i].gauge = vl->values[i].gauge;
249         break;
250
251       case DS_TYPE_DERIVE:
252         ce->values_gauge[i] = NAN;
253         ce->values_raw[i].derive = vl->values[i].derive;
254         break;
255
256       case DS_TYPE_ABSOLUTE:
257         ce->values_gauge[i] = NAN;
258         if (vl->interval > 0)
259           ce->values_gauge[i] = ((double) vl->values[i].absolute)
260             / ((double) vl->interval);
261         ce->values_raw[i].absolute = vl->values[i].absolute;
262         break;
263         
264       default:
265         /* This shouldn't happen. */
266         ERROR ("uc_insert: Don't know how to handle data source type %i.",
267             ds->ds[i].type);
268         return (-1);
269     } /* switch (ds->ds[i].type) */
270   } /* for (i) */
271
272   /* Prune invalid gauge data */
273   uc_check_range (ds, ce);
274
275   ce->last_time = vl->time;
276   ce->last_update = time (NULL);
277   ce->interval = vl->interval;
278   ce->state = STATE_OKAY;
279
280   if (c_avl_insert (cache_tree, key_copy, ce) != 0)
281   {
282     sfree (key_copy);
283     ERROR ("uc_insert: c_avl_insert failed.");
284     return (-1);
285   }
286
287   DEBUG ("uc_insert: Added %s to the cache.", key);
288   return (0);
289 } /* int uc_insert */
290
291 int uc_init (void)
292 {
293   if (cache_tree == NULL)
294     cache_tree = c_avl_create ((int (*) (const void *, const void *))
295         cache_compare);
296
297   return (0);
298 } /* int uc_init */
299
300 int uc_check_timeout (void)
301 {
302   time_t now;
303   cache_entry_t *ce;
304
305   char **keys = NULL;
306   int keys_len = 0;
307
308   char *key;
309   c_avl_iterator_t *iter;
310   int i;
311   
312   pthread_mutex_lock (&cache_lock);
313
314   now = time (NULL);
315
316   /* Build a list of entries to be flushed */
317   iter = c_avl_get_iterator (cache_tree);
318   while (c_avl_iterator_next (iter, (void *) &key, (void *) &ce) == 0)
319   {
320     /* If entry has not been updated, add to `keys' array */
321     if ((now - ce->last_update) >= (2 * ce->interval))
322     {
323       char **tmp;
324
325       tmp = (char **) realloc ((void *) keys,
326           (keys_len + 1) * sizeof (char *));
327       if (tmp == NULL)
328       {
329         ERROR ("uc_check_timeout: realloc failed.");
330         c_avl_iterator_destroy (iter);
331         sfree (keys);
332         pthread_mutex_unlock (&cache_lock);
333         return (-1);
334       }
335
336       keys = tmp;
337       keys[keys_len] = strdup (key);
338       if (keys[keys_len] == NULL)
339       {
340         ERROR ("uc_check_timeout: strdup failed.");
341         continue;
342       }
343       keys_len++;
344     }
345   } /* while (c_avl_iterator_next) */
346
347   ce = NULL;
348
349   for (i = 0; i < keys_len; i++)
350   {
351     int status;
352
353     status = ut_check_interesting (keys[i]);
354
355     if (status < 0)
356     {
357       ERROR ("uc_check_timeout: ut_check_interesting failed.");
358       sfree (keys[i]);
359       continue;
360     }
361     else if (status == 0) /* ``service'' is uninteresting */
362     {
363       DEBUG ("uc_check_timeout: %s is missing but ``uninteresting''",
364           keys[i]);
365       status = c_avl_remove (cache_tree, keys[i],
366           (void *) &key, (void *) &ce);
367       if (status != 0)
368       {
369         ERROR ("uc_check_timeout: c_avl_remove (%s) failed.", keys[i]);
370       }
371       sfree (keys[i]);
372       sfree (key);
373       cache_free (ce);
374       continue;
375     }
376
377     /* If we get here, the value is ``interesting''. Query the record from the
378      * cache and update the state field. */
379     if (c_avl_get (cache_tree, keys[i], (void *) &ce) != 0)
380     {
381       ERROR ("uc_check_timeout: cannot get data for %s from cache", keys[i]);
382       /* Do not free `keys[i]' so a notification is sent further down. */
383       continue;
384     }
385     assert (ce != NULL);
386
387     if (status == 2) /* persist */
388     {
389       DEBUG ("uc_check_timeout: %s is missing, sending notification.",
390           keys[i]);
391       ce->state = STATE_MISSING;
392       /* Do not free `keys[i]' so a notification is sent further down. */
393     }
394     else if (status == 1) /* do not persist */
395     {
396       if (ce->state == STATE_MISSING)
397       {
398         DEBUG ("uc_check_timeout: %s is missing but "
399             "notification has already been sent.",
400             keys[i]);
401         /* Set `keys[i]' to NULL to no notification is sent. */
402         sfree (keys[i]);
403       }
404       else /* (ce->state != STATE_MISSING) */
405       {
406         DEBUG ("uc_check_timeout: %s is missing, sending one notification.",
407             keys[i]);
408         ce->state = STATE_MISSING;
409         /* Do not free `keys[i]' so a notification is sent further down. */
410       }
411     }
412     else
413     {
414       WARNING ("uc_check_timeout: ut_check_interesting (%s) returned "
415           "invalid status %i.",
416           keys[i], status);
417       sfree (keys[i]);
418     }
419
420     /* Make really sure the next iteration doesn't work with this pointer.
421      * There have been too many bugs in the past.. :/  -- octo */
422     ce = NULL;
423   } /* for (keys[i]) */
424
425   c_avl_iterator_destroy (iter);
426
427   pthread_mutex_unlock (&cache_lock);
428
429   for (i = 0; i < keys_len; i++)
430   {
431     if (keys[i] == NULL)
432       continue;
433
434     uc_send_notification (keys[i]);
435     sfree (keys[i]);
436   }
437
438   sfree (keys);
439
440   return (0);
441 } /* int uc_check_timeout */
442
443 int uc_update (const data_set_t *ds, const value_list_t *vl)
444 {
445   char name[6 * DATA_MAX_NAME_LEN];
446   cache_entry_t *ce = NULL;
447   int send_okay_notification = 0;
448   time_t update_delay = 0;
449   notification_t n;
450   int status;
451   int i;
452
453   if (FORMAT_VL (name, sizeof (name), vl) != 0)
454   {
455     ERROR ("uc_update: FORMAT_VL failed.");
456     return (-1);
457   }
458
459   pthread_mutex_lock (&cache_lock);
460
461   status = c_avl_get (cache_tree, name, (void *) &ce);
462   if (status != 0) /* entry does not yet exist */
463   {
464     status = uc_insert (ds, vl, name);
465     pthread_mutex_unlock (&cache_lock);
466     return (status);
467   }
468
469   assert (ce != NULL);
470   assert (ce->values_num == ds->ds_num);
471
472   if (ce->last_time >= vl->time)
473   {
474     pthread_mutex_unlock (&cache_lock);
475     NOTICE ("uc_update: Value too old: name = %s; value time = %u; "
476         "last cache update = %u;",
477         name, (unsigned int) vl->time, (unsigned int) ce->last_time);
478     return (-1);
479   }
480
481   /* Send a notification (after the lock has been released) if we switch the
482    * state from something else to `okay'. */
483   if (ce->state == STATE_MISSING)
484   {
485     send_okay_notification = 1;
486     ce->state = STATE_OKAY;
487     update_delay = time (NULL) - ce->last_update;
488   }
489
490   for (i = 0; i < ds->ds_num; i++)
491   {
492     switch (ds->ds[i].type)
493     {
494       case DS_TYPE_COUNTER:
495         {
496           counter_t diff;
497
498           /* check if the counter has wrapped around */
499           if (vl->values[i].counter < ce->values_raw[i].counter)
500           {
501             if (ce->values_raw[i].counter <= 4294967295U)
502               diff = (4294967295U - ce->values_raw[i].counter)
503                 + vl->values[i].counter;
504             else
505               diff = (18446744073709551615ULL - ce->values_raw[i].counter)
506                 + vl->values[i].counter;
507           }
508           else /* counter has NOT wrapped around */
509           {
510             diff = vl->values[i].counter - ce->values_raw[i].counter;
511           }
512
513           ce->values_gauge[i] = ((double) diff)
514             / ((double) (vl->time - ce->last_time));
515           ce->values_raw[i].counter = vl->values[i].counter;
516         }
517         break;
518
519       case DS_TYPE_GAUGE:
520         ce->values_raw[i].gauge = vl->values[i].gauge;
521         ce->values_gauge[i] = vl->values[i].gauge;
522         break;
523
524       case DS_TYPE_DERIVE:
525         {
526           derive_t diff;
527
528           diff = vl->values[i].derive - ce->values_raw[i].derive;
529
530           ce->values_gauge[i] = ((double) diff)
531             / ((double) (vl->time - ce->last_time));
532           ce->values_raw[i].derive = vl->values[i].derive;
533         }
534         break;
535
536       case DS_TYPE_ABSOLUTE:
537         ce->values_gauge[i] = ((double) vl->values[i].absolute)
538           / ((double) (vl->time - ce->last_time));
539         ce->values_raw[i].absolute = vl->values[i].absolute;
540         break;
541
542       default:
543         /* This shouldn't happen. */
544         pthread_mutex_unlock (&cache_lock);
545         ERROR ("uc_update: Don't know how to handle data source type %i.",
546             ds->ds[i].type);
547         return (-1);
548     } /* switch (ds->ds[i].type) */
549
550     DEBUG ("uc_update: %s: ds[%i] = %lf", name, i, ce->values_gauge[i]);
551   } /* for (i) */
552
553   /* Update the history if it exists. */
554   if (ce->history != NULL)
555   {
556     assert (ce->history_index < ce->history_length);
557     for (i = 0; i < ce->values_num; i++)
558     {
559       size_t hist_idx = (ce->values_num * ce->history_index) + i;
560       ce->history[hist_idx] = ce->values_gauge[i];
561     }
562
563     assert (ce->history_length > 0);
564     ce->history_index = (ce->history_index + 1) % ce->history_length;
565   }
566
567   /* Prune invalid gauge data */
568   uc_check_range (ds, ce);
569
570   ce->last_time = vl->time;
571   ce->last_update = time (NULL);
572   ce->interval = vl->interval;
573
574   pthread_mutex_unlock (&cache_lock);
575
576   if (send_okay_notification == 0)
577     return (0);
578
579   /* Do not send okay notifications for uninteresting values, i. e. values for
580    * which no threshold is configured. */
581   status = ut_check_interesting (name);
582   if (status <= 0)
583     return (0);
584
585   /* Initialize the notification */
586   memset (&n, '\0', sizeof (n));
587   NOTIFICATION_INIT_VL (&n, vl, ds);
588
589   n.severity = NOTIF_OKAY;
590   n.time = vl->time;
591
592   ssnprintf (n.message, sizeof (n.message),
593       "Received a value for %s. It was missing for %u seconds.",
594       name, (unsigned int) update_delay);
595
596   plugin_dispatch_notification (&n);
597
598   return (0);
599 } /* int uc_update */
600
601 int uc_get_rate_by_name (const char *name, gauge_t **ret_values, size_t *ret_values_num)
602 {
603   gauge_t *ret = NULL;
604   size_t ret_num = 0;
605   cache_entry_t *ce = NULL;
606   int status = 0;
607
608   pthread_mutex_lock (&cache_lock);
609
610   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
611   {
612     assert (ce != NULL);
613
614     ret_num = ce->values_num;
615     ret = (gauge_t *) malloc (ret_num * sizeof (gauge_t));
616     if (ret == NULL)
617     {
618       ERROR ("utils_cache: uc_get_rate_by_name: malloc failed.");
619       status = -1;
620     }
621     else
622     {
623       memcpy (ret, ce->values_gauge, ret_num * sizeof (gauge_t));
624     }
625   }
626   else
627   {
628     DEBUG ("utils_cache: uc_get_rate_by_name: No such value: %s", name);
629     status = -1;
630   }
631
632   pthread_mutex_unlock (&cache_lock);
633
634   if (status == 0)
635   {
636     *ret_values = ret;
637     *ret_values_num = ret_num;
638   }
639
640   return (status);
641 } /* gauge_t *uc_get_rate_by_name */
642
643 gauge_t *uc_get_rate (const data_set_t *ds, const value_list_t *vl)
644 {
645   char name[6 * DATA_MAX_NAME_LEN];
646   gauge_t *ret = NULL;
647   size_t ret_num = 0;
648   int status;
649
650   if (FORMAT_VL (name, sizeof (name), vl) != 0)
651   {
652     ERROR ("utils_cache: uc_get_rate: FORMAT_VL failed.");
653     return (NULL);
654   }
655
656   status = uc_get_rate_by_name (name, &ret, &ret_num);
657   if (status != 0)
658     return (NULL);
659
660   /* This is important - the caller has no other way of knowing how many
661    * values are returned. */
662   if (ret_num != (size_t) ds->ds_num)
663   {
664     ERROR ("utils_cache: uc_get_rate: ds[%s] has %i values, "
665         "but uc_get_rate_by_name returned %zu.",
666         ds->type, ds->ds_num, ret_num);
667     sfree (ret);
668     return (NULL);
669   }
670
671   return (ret);
672 } /* gauge_t *uc_get_rate */
673
674 int uc_get_names (char ***ret_names, time_t **ret_times, size_t *ret_number)
675 {
676   c_avl_iterator_t *iter;
677   char *key;
678   cache_entry_t *value;
679
680   char **names = NULL;
681   time_t *times = NULL;
682   size_t number = 0;
683
684   int status = 0;
685
686   if ((ret_names == NULL) || (ret_number == NULL))
687     return (-1);
688
689   pthread_mutex_lock (&cache_lock);
690
691   iter = c_avl_get_iterator (cache_tree);
692   while (c_avl_iterator_next (iter, (void *) &key, (void *) &value) == 0)
693   {
694     char **temp;
695
696     if (ret_times != NULL)
697     {
698       time_t *tmp_times;
699
700       tmp_times = (time_t *) realloc (times, sizeof (time_t) * (number + 1));
701       if (tmp_times == NULL)
702       {
703         status = -1;
704         break;
705       }
706       times = tmp_times;
707       times[number] = value->last_time;
708     }
709
710     temp = (char **) realloc (names, sizeof (char *) * (number + 1));
711     if (temp == NULL)
712     {
713       status = -1;
714       break;
715     }
716     names = temp;
717     names[number] = strdup (key);
718     if (names[number] == NULL)
719     {
720       status = -1;
721       break;
722     }
723     number++;
724   } /* while (c_avl_iterator_next) */
725
726   c_avl_iterator_destroy (iter);
727   pthread_mutex_unlock (&cache_lock);
728
729   if (status != 0)
730   {
731     size_t i;
732     
733     for (i = 0; i < number; i++)
734     {
735       sfree (names[i]);
736     }
737     sfree (names);
738
739     return (-1);
740   }
741
742   *ret_names = names;
743   if (ret_times != NULL)
744     *ret_times = times;
745   *ret_number = number;
746
747   return (0);
748 } /* int uc_get_names */
749
750 int uc_get_state (const data_set_t *ds, const value_list_t *vl)
751 {
752   char name[6 * DATA_MAX_NAME_LEN];
753   cache_entry_t *ce = NULL;
754   int ret = STATE_ERROR;
755
756   if (FORMAT_VL (name, sizeof (name), vl) != 0)
757   {
758     ERROR ("uc_get_state: FORMAT_VL failed.");
759     return (STATE_ERROR);
760   }
761
762   pthread_mutex_lock (&cache_lock);
763
764   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
765   {
766     assert (ce != NULL);
767     ret = ce->state;
768   }
769
770   pthread_mutex_unlock (&cache_lock);
771
772   return (ret);
773 } /* int uc_get_state */
774
775 int uc_set_state (const data_set_t *ds, const value_list_t *vl, int state)
776 {
777   char name[6 * DATA_MAX_NAME_LEN];
778   cache_entry_t *ce = NULL;
779   int ret = -1;
780
781   if (FORMAT_VL (name, sizeof (name), vl) != 0)
782   {
783     ERROR ("uc_get_state: FORMAT_VL failed.");
784     return (STATE_ERROR);
785   }
786
787   pthread_mutex_lock (&cache_lock);
788
789   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
790   {
791     assert (ce != NULL);
792     ret = ce->state;
793     ce->state = state;
794   }
795
796   pthread_mutex_unlock (&cache_lock);
797
798   return (ret);
799 } /* int uc_set_state */
800
801 int uc_get_history_by_name (const char *name,
802     gauge_t *ret_history, size_t num_steps, size_t num_ds)
803 {
804   cache_entry_t *ce = NULL;
805   size_t i;
806   int status = 0;
807
808   pthread_mutex_lock (&cache_lock);
809
810   status = c_avl_get (cache_tree, name, (void *) &ce);
811   if (status != 0)
812   {
813     pthread_mutex_unlock (&cache_lock);
814     return (-ENOENT);
815   }
816
817   if (((size_t) ce->values_num) != num_ds)
818   {
819     pthread_mutex_unlock (&cache_lock);
820     return (-EINVAL);
821   }
822
823   /* Check if there are enough values available. If not, increase the buffer
824    * size. */
825   if (ce->history_length < num_steps)
826   {
827     gauge_t *tmp;
828     size_t i;
829
830     tmp = realloc (ce->history, sizeof (*ce->history)
831         * num_steps * ce->values_num);
832     if (tmp == NULL)
833     {
834       pthread_mutex_unlock (&cache_lock);
835       return (-ENOMEM);
836     }
837
838     for (i = ce->history_length * ce->values_num;
839         i < (num_steps * ce->values_num);
840         i++)
841       tmp[i] = NAN;
842
843     ce->history = tmp;
844     ce->history_length = num_steps;
845   } /* if (ce->history_length < num_steps) */
846
847   /* Copy the values to the output buffer. */
848   for (i = 0; i < num_steps; i++)
849   {
850     size_t src_index;
851     size_t dst_index;
852
853     if (i < ce->history_index)
854       src_index = ce->history_index - (i + 1);
855     else
856       src_index = ce->history_length + ce->history_index - (i + 1);
857     src_index = src_index * num_ds;
858
859     dst_index = i * num_ds;
860
861     memcpy (ret_history + dst_index, ce->history + src_index,
862         sizeof (*ret_history) * num_ds);
863   }
864
865   pthread_mutex_unlock (&cache_lock);
866
867   return (0);
868 } /* int uc_get_history_by_name */
869
870 int uc_get_history (const data_set_t *ds, const value_list_t *vl,
871     gauge_t *ret_history, size_t num_steps, size_t num_ds)
872 {
873   char name[6 * DATA_MAX_NAME_LEN];
874
875   if (FORMAT_VL (name, sizeof (name), vl) != 0)
876   {
877     ERROR ("utils_cache: uc_get_history: FORMAT_VL failed.");
878     return (-1);
879   }
880
881   return (uc_get_history_by_name (name, ret_history, num_steps, num_ds));
882 } /* int uc_get_history */
883
884 /*
885  * Meta data interface
886  */
887 /* XXX: This function will acquire `cache_lock' but will not free it! */
888 static meta_data_t *uc_get_meta (const value_list_t *vl) /* {{{ */
889 {
890   char name[6 * DATA_MAX_NAME_LEN];
891   cache_entry_t *ce = NULL;
892   int status;
893
894   status = FORMAT_VL (name, sizeof (name), vl);
895   if (status != 0)
896   {
897     ERROR ("utils_cache: uc_get_meta: FORMAT_VL failed.");
898     return (NULL);
899   }
900
901   pthread_mutex_lock (&cache_lock);
902
903   status = c_avl_get (cache_tree, name, (void *) &ce);
904   if (status != 0)
905   {
906     pthread_mutex_unlock (&cache_lock);
907     NOTICE ("utils_cache: uc_get_meta: No such cache entry: %s", name);
908     return (NULL);
909   }
910   assert (ce != NULL);
911
912   if (ce->meta == NULL)
913     ce->meta = meta_data_create ();
914
915   return (ce->meta);
916 } /* }}} meta_data_t *uc_get_meta */
917
918 /* Sorry about this preprocessor magic, but it really makes this file much
919  * shorter.. */
920 #define UC_WRAP(wrap_function) { \
921   meta_data_t *meta; \
922   int status; \
923   meta = uc_get_meta (vl); \
924   if (meta == NULL) return (-1); \
925   status = wrap_function (meta, key); \
926   pthread_mutex_unlock (&cache_lock); \
927   return (status); \
928 }
929 int uc_meta_data_exists (const value_list_t *vl, const char *key)
930   UC_WRAP (meta_data_exists)
931
932 int uc_meta_data_delete (const value_list_t *vl, const char *key)
933   UC_WRAP (meta_data_delete)
934 #undef UC_WRAP
935
936 /* We need a new version of this macro because the following functions take
937  * two argumetns. */
938 #define UC_WRAP(wrap_function) { \
939   meta_data_t *meta; \
940   int status; \
941   meta = uc_get_meta (vl); \
942   if (meta == NULL) return (-1); \
943   status = wrap_function (meta, key, value); \
944   pthread_mutex_unlock (&cache_lock); \
945   return (status); \
946 }
947 int uc_meta_data_add_string (const value_list_t *vl,
948     const char *key,
949     const char *value)
950   UC_WRAP(meta_data_add_string)
951 int uc_meta_data_add_signed_int (const value_list_t *vl,
952     const char *key,
953     int64_t value)
954   UC_WRAP(meta_data_add_signed_int)
955 int uc_meta_data_add_unsigned_int (const value_list_t *vl,
956     const char *key,
957     uint64_t value)
958   UC_WRAP(meta_data_add_unsigned_int)
959 int uc_meta_data_add_double (const value_list_t *vl,
960     const char *key,
961     double value)
962   UC_WRAP(meta_data_add_double)
963 int uc_meta_data_add_boolean (const value_list_t *vl,
964     const char *key,
965     _Bool value)
966   UC_WRAP(meta_data_add_boolean)
967
968 int uc_meta_data_get_string (const value_list_t *vl,
969     const char *key,
970     char **value)
971   UC_WRAP(meta_data_get_string)
972 int uc_meta_data_get_signed_int (const value_list_t *vl,
973     const char *key,
974     int64_t *value)
975   UC_WRAP(meta_data_get_signed_int)
976 int uc_meta_data_get_unsigned_int (const value_list_t *vl,
977     const char *key,
978     uint64_t *value)
979   UC_WRAP(meta_data_get_unsigned_int)
980 int uc_meta_data_get_double (const value_list_t *vl,
981     const char *key,
982     double *value)
983   UC_WRAP(meta_data_get_double)
984 int uc_meta_data_get_boolean (const value_list_t *vl,
985     const char *key,
986     _Bool *value)
987   UC_WRAP(meta_data_get_boolean)
988 #undef UC_WRAP
989
990 /* vim: set sw=2 ts=8 sts=2 tw=78 : */