src/utils_cache.[ch]: Implement a meta-data interface for cached values.
[collectd.git] / src / utils_cache.c
1 /**
2  * collectd - src/utils_cache.c
3  * Copyright (C) 2007,2008  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Author:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "common.h"
24 #include "plugin.h"
25 #include "utils_avltree.h"
26 #include "utils_cache.h"
27 #include "utils_threshold.h"
28 #include "meta_data.h"
29
30 #include <assert.h>
31 #include <pthread.h>
32
33 typedef struct cache_entry_s
34 {
35         char name[6 * DATA_MAX_NAME_LEN];
36         int        values_num;
37         gauge_t   *values_gauge;
38         value_t   *values_raw;
39         /* Time contained in the package
40          * (for calculating rates) */
41         time_t last_time;
42         /* Time according to the local clock
43          * (for purging old entries) */
44         time_t last_update;
45         /* Interval in which the data is collected
46          * (for purding old entries) */
47         int interval;
48         int state;
49
50         /*
51          * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
52          * !  0  !  1  !  2  !  3  !  4  !  5  !  6  !  7  !  8  ! ...
53          * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
54          * ! ds0 ! ds1 ! ds2 ! ds0 ! ds1 ! ds2 ! ds0 ! ds1 ! ds2 ! ...
55          * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
56          * !      t = 0      !      t = 1      !      t = 2      ! ...
57          * +-----------------+-----------------+-----------------+----
58          */
59         gauge_t *history;
60         size_t   history_index; /* points to the next position to write to. */
61         size_t   history_length;
62
63         meta_data_t *meta;
64 } cache_entry_t;
65
66 static c_avl_tree_t   *cache_tree = NULL;
67 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
68
69 static int cache_compare (const cache_entry_t *a, const cache_entry_t *b)
70 {
71   assert ((a != NULL) && (b != NULL));
72   return (strcmp (a->name, b->name));
73 } /* int cache_compare */
74
75 static cache_entry_t *cache_alloc (int values_num)
76 {
77   cache_entry_t *ce;
78
79   ce = (cache_entry_t *) malloc (sizeof (cache_entry_t));
80   if (ce == NULL)
81   {
82     ERROR ("utils_cache: cache_alloc: malloc failed.");
83     return (NULL);
84   }
85   memset (ce, '\0', sizeof (cache_entry_t));
86   ce->values_num = values_num;
87
88   ce->values_gauge = calloc (values_num, sizeof (*ce->values_gauge));
89   ce->values_raw   = calloc (values_num, sizeof (*ce->values_raw));
90   if ((ce->values_gauge == NULL) || (ce->values_raw == NULL))
91   {
92     sfree (ce->values_gauge);
93     sfree (ce->values_raw);
94     sfree (ce);
95     ERROR ("utils_cache: cache_alloc: calloc failed.");
96     return (NULL);
97   }
98
99   ce->history = NULL;
100   ce->history_length = 0;
101
102   return (ce);
103 } /* cache_entry_t *cache_alloc */
104
105 static void cache_free (cache_entry_t *ce)
106 {
107   if (ce == NULL)
108     return;
109
110   sfree (ce->values_gauge);
111   sfree (ce->values_raw);
112   sfree (ce->history);
113   sfree (ce);
114 } /* void cache_free */
115
116 static int uc_send_notification (const char *name)
117 {
118   cache_entry_t *ce = NULL;
119   int status;
120
121   char *name_copy;
122   char *host;
123   char *plugin;
124   char *plugin_instance;
125   char *type;
126   char *type_instance;
127
128   notification_t n;
129
130   name_copy = strdup (name);
131   if (name_copy == NULL)
132   {
133     ERROR ("uc_send_notification: strdup failed.");
134     return (-1);
135   }
136
137   status = parse_identifier (name_copy, &host,
138       &plugin, &plugin_instance,
139       &type, &type_instance);
140   if (status != 0)
141   {
142     ERROR ("uc_send_notification: Cannot parse name `%s'", name);
143     return (-1);
144   }
145
146   /* Copy the associative members */
147   notification_init (&n, NOTIF_FAILURE, /* host = */ NULL,
148       host, plugin, plugin_instance, type, type_instance);
149
150   sfree (name_copy);
151   name_copy = host = plugin = plugin_instance = type = type_instance = NULL;
152
153   pthread_mutex_lock (&cache_lock);
154
155   /*
156    * Set the time _after_ getting the lock because we don't know how long
157    * acquiring the lock takes and we will use this time later to decide
158    * whether or not the state is OKAY.
159    */
160   n.time = time (NULL);
161
162   status = c_avl_get (cache_tree, name, (void *) &ce);
163   if (status != 0)
164   {
165     pthread_mutex_unlock (&cache_lock);
166     sfree (name_copy);
167     return (-1);
168   }
169     
170   /* Check if the entry has been updated in the meantime */
171   if ((n.time - ce->last_update) < (2 * ce->interval))
172   {
173     ce->state = STATE_OKAY;
174     pthread_mutex_unlock (&cache_lock);
175     sfree (name_copy);
176     return (-1);
177   }
178
179   ssnprintf (n.message, sizeof (n.message),
180       "%s has not been updated for %i seconds.", name,
181       (int) (n.time - ce->last_update));
182
183   pthread_mutex_unlock (&cache_lock);
184
185   plugin_dispatch_notification (&n);
186
187   return (0);
188 } /* int uc_send_notification */
189
190 static void uc_check_range (const data_set_t *ds, cache_entry_t *ce)
191 {
192   int i;
193
194   for (i = 0; i < ds->ds_num; i++)
195   {
196     if (isnan (ce->values_gauge[i]))
197       continue;
198     else if (ce->values_gauge[i] < ds->ds[i].min)
199       ce->values_gauge[i] = NAN;
200     else if (ce->values_gauge[i] > ds->ds[i].max)
201       ce->values_gauge[i] = NAN;
202   }
203 } /* void uc_check_range */
204
205 static int uc_insert (const data_set_t *ds, const value_list_t *vl,
206     const char *key)
207 {
208   int i;
209   char *key_copy;
210   cache_entry_t *ce;
211
212   /* `cache_lock' has been locked by `uc_update' */
213
214   key_copy = strdup (key);
215   if (key_copy == NULL)
216   {
217     ERROR ("uc_insert: strdup failed.");
218     return (-1);
219   }
220
221   ce = cache_alloc (ds->ds_num);
222   if (ce == NULL)
223   {
224     sfree (key_copy);
225     ERROR ("uc_insert: cache_alloc (%i) failed.", ds->ds_num);
226     return (-1);
227   }
228
229   sstrncpy (ce->name, key, sizeof (ce->name));
230
231   for (i = 0; i < ds->ds_num; i++)
232   {
233     switch (ds->ds[i].type)
234     {
235       case DS_TYPE_COUNTER:
236         ce->values_gauge[i] = NAN;
237         ce->values_raw[i].counter = vl->values[i].counter;
238         break;
239
240       case DS_TYPE_GAUGE:
241         ce->values_gauge[i] = vl->values[i].gauge;
242         ce->values_raw[i].gauge = vl->values[i].gauge;
243         break;
244
245       case DS_TYPE_DERIVE:
246         ce->values_gauge[i] = NAN;
247         ce->values_raw[i].derive = vl->values[i].derive;
248         break;
249
250       case DS_TYPE_ABSOLUTE:
251         ce->values_gauge[i] = NAN;
252         if (vl->interval > 0)
253           ce->values_gauge[i] = ((double) vl->values[i].absolute)
254             / ((double) vl->interval);
255         ce->values_raw[i].absolute = vl->values[i].absolute;
256         break;
257         
258       default:
259         /* This shouldn't happen. */
260         ERROR ("uc_insert: Don't know how to handle data source type %i.",
261             ds->ds[i].type);
262         return (-1);
263     } /* switch (ds->ds[i].type) */
264   } /* for (i) */
265
266   /* Prune invalid gauge data */
267   uc_check_range (ds, ce);
268
269   ce->last_time = vl->time;
270   ce->last_update = time (NULL);
271   ce->interval = vl->interval;
272   ce->state = STATE_OKAY;
273
274   if (c_avl_insert (cache_tree, key_copy, ce) != 0)
275   {
276     sfree (key_copy);
277     ERROR ("uc_insert: c_avl_insert failed.");
278     return (-1);
279   }
280
281   DEBUG ("uc_insert: Added %s to the cache.", key);
282   return (0);
283 } /* int uc_insert */
284
285 int uc_init (void)
286 {
287   if (cache_tree == NULL)
288     cache_tree = c_avl_create ((int (*) (const void *, const void *))
289         cache_compare);
290
291   return (0);
292 } /* int uc_init */
293
294 int uc_check_timeout (void)
295 {
296   time_t now;
297   cache_entry_t *ce;
298
299   char **keys = NULL;
300   int keys_len = 0;
301
302   char *key;
303   c_avl_iterator_t *iter;
304   int i;
305   
306   pthread_mutex_lock (&cache_lock);
307
308   now = time (NULL);
309
310   /* Build a list of entries to be flushed */
311   iter = c_avl_get_iterator (cache_tree);
312   while (c_avl_iterator_next (iter, (void *) &key, (void *) &ce) == 0)
313   {
314     /* If entry has not been updated, add to `keys' array */
315     if ((now - ce->last_update) >= (2 * ce->interval))
316     {
317       char **tmp;
318
319       tmp = (char **) realloc ((void *) keys,
320           (keys_len + 1) * sizeof (char *));
321       if (tmp == NULL)
322       {
323         ERROR ("uc_check_timeout: realloc failed.");
324         c_avl_iterator_destroy (iter);
325         sfree (keys);
326         pthread_mutex_unlock (&cache_lock);
327         return (-1);
328       }
329
330       keys = tmp;
331       keys[keys_len] = strdup (key);
332       if (keys[keys_len] == NULL)
333       {
334         ERROR ("uc_check_timeout: strdup failed.");
335         continue;
336       }
337       keys_len++;
338     }
339   } /* while (c_avl_iterator_next) */
340
341   ce = NULL;
342
343   for (i = 0; i < keys_len; i++)
344   {
345     int status;
346
347     status = ut_check_interesting (keys[i]);
348
349     if (status < 0)
350     {
351       ERROR ("uc_check_timeout: ut_check_interesting failed.");
352       sfree (keys[i]);
353       continue;
354     }
355     else if (status == 0) /* ``service'' is uninteresting */
356     {
357       DEBUG ("uc_check_timeout: %s is missing but ``uninteresting''",
358           keys[i]);
359       status = c_avl_remove (cache_tree, keys[i],
360           (void *) &key, (void *) &ce);
361       if (status != 0)
362       {
363         ERROR ("uc_check_timeout: c_avl_remove (%s) failed.", keys[i]);
364       }
365       sfree (keys[i]);
366       sfree (key);
367       cache_free (ce);
368       continue;
369     }
370
371     /* If we get here, the value is ``interesting''. Query the record from the
372      * cache and update the state field. */
373     if (c_avl_get (cache_tree, keys[i], (void *) &ce) != 0)
374     {
375       ERROR ("uc_check_timeout: cannot get data for %s from cache", keys[i]);
376       /* Do not free `keys[i]' so a notification is sent further down. */
377       continue;
378     }
379     assert (ce != NULL);
380
381     if (status == 2) /* persist */
382     {
383       DEBUG ("uc_check_timeout: %s is missing, sending notification.",
384           keys[i]);
385       ce->state = STATE_MISSING;
386       /* Do not free `keys[i]' so a notification is sent further down. */
387     }
388     else if (status == 1) /* do not persist */
389     {
390       if (ce->state == STATE_MISSING)
391       {
392         DEBUG ("uc_check_timeout: %s is missing but "
393             "notification has already been sent.",
394             keys[i]);
395         /* Set `keys[i]' to NULL to no notification is sent. */
396         sfree (keys[i]);
397       }
398       else /* (ce->state != STATE_MISSING) */
399       {
400         DEBUG ("uc_check_timeout: %s is missing, sending one notification.",
401             keys[i]);
402         ce->state = STATE_MISSING;
403         /* Do not free `keys[i]' so a notification is sent further down. */
404       }
405     }
406     else
407     {
408       WARNING ("uc_check_timeout: ut_check_interesting (%s) returned "
409           "invalid status %i.",
410           keys[i], status);
411       sfree (keys[i]);
412     }
413
414     /* Make really sure the next iteration doesn't work with this pointer.
415      * There have been too many bugs in the past.. :/  -- octo */
416     ce = NULL;
417   } /* for (keys[i]) */
418
419   c_avl_iterator_destroy (iter);
420
421   pthread_mutex_unlock (&cache_lock);
422
423   for (i = 0; i < keys_len; i++)
424   {
425     if (keys[i] == NULL)
426       continue;
427
428     uc_send_notification (keys[i]);
429     sfree (keys[i]);
430   }
431
432   sfree (keys);
433
434   return (0);
435 } /* int uc_check_timeout */
436
437 int uc_update (const data_set_t *ds, const value_list_t *vl)
438 {
439   char name[6 * DATA_MAX_NAME_LEN];
440   cache_entry_t *ce = NULL;
441   int send_okay_notification = 0;
442   time_t update_delay = 0;
443   notification_t n;
444   int status;
445   int i;
446
447   if (FORMAT_VL (name, sizeof (name), vl) != 0)
448   {
449     ERROR ("uc_update: FORMAT_VL failed.");
450     return (-1);
451   }
452
453   pthread_mutex_lock (&cache_lock);
454
455   status = c_avl_get (cache_tree, name, (void *) &ce);
456   if (status != 0) /* entry does not yet exist */
457   {
458     status = uc_insert (ds, vl, name);
459     pthread_mutex_unlock (&cache_lock);
460     return (status);
461   }
462
463   assert (ce != NULL);
464   assert (ce->values_num == ds->ds_num);
465
466   if (ce->last_time >= vl->time)
467   {
468     pthread_mutex_unlock (&cache_lock);
469     NOTICE ("uc_update: Value too old: name = %s; value time = %u; "
470         "last cache update = %u;",
471         name, (unsigned int) vl->time, (unsigned int) ce->last_time);
472     return (-1);
473   }
474
475   /* Send a notification (after the lock has been released) if we switch the
476    * state from something else to `okay'. */
477   if (ce->state == STATE_MISSING)
478   {
479     send_okay_notification = 1;
480     ce->state = STATE_OKAY;
481     update_delay = time (NULL) - ce->last_update;
482   }
483
484   for (i = 0; i < ds->ds_num; i++)
485   {
486     switch (ds->ds[i].type)
487     {
488       case DS_TYPE_COUNTER:
489         {
490           counter_t diff;
491
492           /* check if the counter has wrapped around */
493           if (vl->values[i].counter < ce->values_raw[i].counter)
494           {
495             if (ce->values_raw[i].counter <= 4294967295U)
496               diff = (4294967295U - ce->values_raw[i].counter)
497                 + vl->values[i].counter;
498             else
499               diff = (18446744073709551615ULL - ce->values_raw[i].counter)
500                 + vl->values[i].counter;
501           }
502           else /* counter has NOT wrapped around */
503           {
504             diff = vl->values[i].counter - ce->values_raw[i].counter;
505           }
506
507           ce->values_gauge[i] = ((double) diff)
508             / ((double) (vl->time - ce->last_time));
509           ce->values_raw[i].counter = vl->values[i].counter;
510         }
511         break;
512
513       case DS_TYPE_GAUGE:
514         ce->values_raw[i].gauge = vl->values[i].gauge;
515         ce->values_gauge[i] = vl->values[i].gauge;
516         break;
517
518       case DS_TYPE_DERIVE:
519         {
520           derive_t diff;
521
522           diff = vl->values[i].derive - ce->values_raw[i].derive;
523
524           ce->values_gauge[i] = ((double) diff)
525             / ((double) (vl->time - ce->last_time));
526           ce->values_raw[i].derive = vl->values[i].derive;
527         }
528         break;
529
530       case DS_TYPE_ABSOLUTE:
531         ce->values_gauge[i] = ((double) vl->values[i].absolute)
532           / ((double) (vl->time - ce->last_time));
533         ce->values_raw[i].absolute = vl->values[i].absolute;
534         break;
535
536       default:
537         /* This shouldn't happen. */
538         pthread_mutex_unlock (&cache_lock);
539         ERROR ("uc_update: Don't know how to handle data source type %i.",
540             ds->ds[i].type);
541         return (-1);
542     } /* switch (ds->ds[i].type) */
543
544     DEBUG ("uc_update: %s: ds[%i] = %lf", name, i, ce->values_gauge[i]);
545   } /* for (i) */
546
547   /* Update the history if it exists. */
548   if (ce->history != NULL)
549   {
550     assert (ce->history_index < ce->history_length);
551     for (i = 0; i < ce->values_num; i++)
552     {
553       size_t hist_idx = (ce->values_num * ce->history_index) + i;
554       ce->history[hist_idx] = ce->values_gauge[i];
555     }
556
557     assert (ce->history_length > 0);
558     ce->history_index = (ce->history_index + 1) % ce->history_length;
559   }
560
561   /* Prune invalid gauge data */
562   uc_check_range (ds, ce);
563
564   ce->last_time = vl->time;
565   ce->last_update = time (NULL);
566   ce->interval = vl->interval;
567
568   pthread_mutex_unlock (&cache_lock);
569
570   if (send_okay_notification == 0)
571     return (0);
572
573   /* Do not send okay notifications for uninteresting values, i. e. values for
574    * which no threshold is configured. */
575   status = ut_check_interesting (name);
576   if (status <= 0)
577     return (0);
578
579   /* Initialize the notification */
580   memset (&n, '\0', sizeof (n));
581   NOTIFICATION_INIT_VL (&n, vl, ds);
582
583   n.severity = NOTIF_OKAY;
584   n.time = vl->time;
585
586   ssnprintf (n.message, sizeof (n.message),
587       "Received a value for %s. It was missing for %u seconds.",
588       name, (unsigned int) update_delay);
589
590   plugin_dispatch_notification (&n);
591
592   return (0);
593 } /* int uc_update */
594
595 int uc_get_rate_by_name (const char *name, gauge_t **ret_values, size_t *ret_values_num)
596 {
597   gauge_t *ret = NULL;
598   size_t ret_num = 0;
599   cache_entry_t *ce = NULL;
600   int status = 0;
601
602   pthread_mutex_lock (&cache_lock);
603
604   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
605   {
606     assert (ce != NULL);
607
608     ret_num = ce->values_num;
609     ret = (gauge_t *) malloc (ret_num * sizeof (gauge_t));
610     if (ret == NULL)
611     {
612       ERROR ("utils_cache: uc_get_rate_by_name: malloc failed.");
613       status = -1;
614     }
615     else
616     {
617       memcpy (ret, ce->values_gauge, ret_num * sizeof (gauge_t));
618     }
619   }
620   else
621   {
622     DEBUG ("utils_cache: uc_get_rate_by_name: No such value: %s", name);
623     status = -1;
624   }
625
626   pthread_mutex_unlock (&cache_lock);
627
628   if (status == 0)
629   {
630     *ret_values = ret;
631     *ret_values_num = ret_num;
632   }
633
634   return (status);
635 } /* gauge_t *uc_get_rate_by_name */
636
637 gauge_t *uc_get_rate (const data_set_t *ds, const value_list_t *vl)
638 {
639   char name[6 * DATA_MAX_NAME_LEN];
640   gauge_t *ret = NULL;
641   size_t ret_num = 0;
642   int status;
643
644   if (FORMAT_VL (name, sizeof (name), vl) != 0)
645   {
646     ERROR ("utils_cache: uc_get_rate: FORMAT_VL failed.");
647     return (NULL);
648   }
649
650   status = uc_get_rate_by_name (name, &ret, &ret_num);
651   if (status != 0)
652     return (NULL);
653
654   /* This is important - the caller has no other way of knowing how many
655    * values are returned. */
656   if (ret_num != (size_t) ds->ds_num)
657   {
658     ERROR ("utils_cache: uc_get_rate: ds[%s] has %i values, "
659         "but uc_get_rate_by_name returned %zu.",
660         ds->type, ds->ds_num, ret_num);
661     sfree (ret);
662     return (NULL);
663   }
664
665   return (ret);
666 } /* gauge_t *uc_get_rate */
667
668 int uc_get_names (char ***ret_names, time_t **ret_times, size_t *ret_number)
669 {
670   c_avl_iterator_t *iter;
671   char *key;
672   cache_entry_t *value;
673
674   char **names = NULL;
675   time_t *times = NULL;
676   size_t number = 0;
677
678   int status = 0;
679
680   if ((ret_names == NULL) || (ret_number == NULL))
681     return (-1);
682
683   pthread_mutex_lock (&cache_lock);
684
685   iter = c_avl_get_iterator (cache_tree);
686   while (c_avl_iterator_next (iter, (void *) &key, (void *) &value) == 0)
687   {
688     char **temp;
689
690     if (ret_times != NULL)
691     {
692       time_t *tmp_times;
693
694       tmp_times = (time_t *) realloc (times, sizeof (time_t) * (number + 1));
695       if (tmp_times == NULL)
696       {
697         status = -1;
698         break;
699       }
700       times = tmp_times;
701       times[number] = value->last_time;
702     }
703
704     temp = (char **) realloc (names, sizeof (char *) * (number + 1));
705     if (temp == NULL)
706     {
707       status = -1;
708       break;
709     }
710     names = temp;
711     names[number] = strdup (key);
712     if (names[number] == NULL)
713     {
714       status = -1;
715       break;
716     }
717     number++;
718   } /* while (c_avl_iterator_next) */
719
720   c_avl_iterator_destroy (iter);
721   pthread_mutex_unlock (&cache_lock);
722
723   if (status != 0)
724   {
725     size_t i;
726     
727     for (i = 0; i < number; i++)
728     {
729       sfree (names[i]);
730     }
731     sfree (names);
732
733     return (-1);
734   }
735
736   *ret_names = names;
737   if (ret_times != NULL)
738     *ret_times = times;
739   *ret_number = number;
740
741   return (0);
742 } /* int uc_get_names */
743
744 int uc_get_state (const data_set_t *ds, const value_list_t *vl)
745 {
746   char name[6 * DATA_MAX_NAME_LEN];
747   cache_entry_t *ce = NULL;
748   int ret = STATE_ERROR;
749
750   if (FORMAT_VL (name, sizeof (name), vl) != 0)
751   {
752     ERROR ("uc_get_state: FORMAT_VL failed.");
753     return (STATE_ERROR);
754   }
755
756   pthread_mutex_lock (&cache_lock);
757
758   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
759   {
760     assert (ce != NULL);
761     ret = ce->state;
762   }
763
764   pthread_mutex_unlock (&cache_lock);
765
766   return (ret);
767 } /* int uc_get_state */
768
769 int uc_set_state (const data_set_t *ds, const value_list_t *vl, int state)
770 {
771   char name[6 * DATA_MAX_NAME_LEN];
772   cache_entry_t *ce = NULL;
773   int ret = -1;
774
775   if (FORMAT_VL (name, sizeof (name), vl) != 0)
776   {
777     ERROR ("uc_get_state: FORMAT_VL failed.");
778     return (STATE_ERROR);
779   }
780
781   pthread_mutex_lock (&cache_lock);
782
783   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
784   {
785     assert (ce != NULL);
786     ret = ce->state;
787     ce->state = state;
788   }
789
790   pthread_mutex_unlock (&cache_lock);
791
792   return (ret);
793 } /* int uc_set_state */
794
795 int uc_get_history_by_name (const char *name,
796     gauge_t *ret_history, size_t num_steps, size_t num_ds)
797 {
798   cache_entry_t *ce = NULL;
799   size_t i;
800   int status = 0;
801
802   pthread_mutex_lock (&cache_lock);
803
804   status = c_avl_get (cache_tree, name, (void *) &ce);
805   if (status != 0)
806   {
807     pthread_mutex_unlock (&cache_lock);
808     return (-ENOENT);
809   }
810
811   if (((size_t) ce->values_num) != num_ds)
812   {
813     pthread_mutex_unlock (&cache_lock);
814     return (-EINVAL);
815   }
816
817   /* Check if there are enough values available. If not, increase the buffer
818    * size. */
819   if (ce->history_length < num_steps)
820   {
821     gauge_t *tmp;
822     size_t i;
823
824     tmp = realloc (ce->history, sizeof (*ce->history)
825         * num_steps * ce->values_num);
826     if (tmp == NULL)
827     {
828       pthread_mutex_unlock (&cache_lock);
829       return (-ENOMEM);
830     }
831
832     for (i = ce->history_length * ce->values_num;
833         i < (num_steps * ce->values_num);
834         i++)
835       tmp[i] = NAN;
836
837     ce->history = tmp;
838     ce->history_length = num_steps;
839   } /* if (ce->history_length < num_steps) */
840
841   /* Copy the values to the output buffer. */
842   for (i = 0; i < num_steps; i++)
843   {
844     size_t src_index;
845     size_t dst_index;
846
847     if (i < ce->history_index)
848       src_index = ce->history_index - (i + 1);
849     else
850       src_index = ce->history_length + ce->history_index - (i + 1);
851     src_index = src_index * num_ds;
852
853     dst_index = i * num_ds;
854
855     memcpy (ret_history + dst_index, ce->history + src_index,
856         sizeof (*ret_history) * num_ds);
857   }
858
859   pthread_mutex_unlock (&cache_lock);
860
861   return (0);
862 } /* int uc_get_history_by_name */
863
864 int uc_get_history (const data_set_t *ds, const value_list_t *vl,
865     gauge_t *ret_history, size_t num_steps, size_t num_ds)
866 {
867   char name[6 * DATA_MAX_NAME_LEN];
868
869   if (FORMAT_VL (name, sizeof (name), vl) != 0)
870   {
871     ERROR ("utils_cache: uc_get_history: FORMAT_VL failed.");
872     return (-1);
873   }
874
875   return (uc_get_history_by_name (name, ret_history, num_steps, num_ds));
876 } /* int uc_get_history */
877
878 /*
879  * Meta data interface
880  */
881 /* XXX: This function will acquire `cache_lock' but will not free it! */
882 static meta_data_t *uc_get_meta (value_list_t *vl) /* {{{ */
883 {
884   char name[6 * DATA_MAX_NAME_LEN];
885   cache_entry_t *ce = NULL;
886   int status;
887
888   status = FORMAT_VL (name, sizeof (name), vl);
889   if (status != 0)
890   {
891     ERROR ("utils_cache: uc_get_meta: FORMAT_VL failed.");
892     return (NULL);
893   }
894
895   pthread_mutex_lock (&cache_lock);
896
897   status = c_avl_get (cache_tree, name, (void *) &ce);
898   if (status != 0)
899   {
900     pthread_mutex_unlock (&cache_lock);
901     NOTICE ("utils_cache: uc_get_meta: No such cache entry: %s", name);
902     return (NULL);
903   }
904   assert (ce != NULL);
905
906   if (ce->meta == NULL)
907     ce->meta = meta_data_create ();
908
909   return (ce->meta);
910 } /* }}} meta_data_t *uc_get_meta */
911
912 /* Sorry about this preprocessor magic, but it really makes this file much
913  * shorter.. */
914 #define UC_WRAP(wrap_function) { \
915   meta_data_t *meta; \
916   int status; \
917   meta = uc_get_meta (vl); \
918   if (meta == NULL) return (-1); \
919   status = wrap_function (meta, key); \
920   pthread_mutex_unlock (&cache_lock); \
921   return (status); \
922 }
923 int uc_meta_data_exists (value_list_t *vl, const char *key)
924   UC_WRAP (meta_data_exists)
925
926 int uc_meta_data_delete (value_list_t *vl, const char *key)
927   UC_WRAP (meta_data_delete)
928 #undef UC_WRAP
929
930 /* We need a new version of this macro because the following functions take
931  * two argumetns. */
932 #define UC_WRAP(wrap_function) { \
933   meta_data_t *meta; \
934   int status; \
935   meta = uc_get_meta (vl); \
936   if (meta == NULL) return (-1); \
937   status = wrap_function (meta, key, value); \
938   pthread_mutex_unlock (&cache_lock); \
939   return (status); \
940 }
941 int uc_meta_data_add_string (value_list_t *vl,
942     const char *key,
943     const char *value)
944   UC_WRAP(meta_data_add_string)
945 int uc_meta_data_add_signed_int (value_list_t *vl,
946     const char *key,
947     int64_t value)
948   UC_WRAP(meta_data_add_signed_int)
949 int uc_meta_data_add_unsigned_int (value_list_t *vl,
950     const char *key,
951     uint64_t value)
952   UC_WRAP(meta_data_add_unsigned_int)
953 int uc_meta_data_add_double (value_list_t *vl,
954     const char *key,
955     double value)
956   UC_WRAP(meta_data_add_double)
957 int uc_meta_data_add_boolean (value_list_t *vl,
958     const char *key,
959     _Bool value)
960   UC_WRAP(meta_data_add_boolean)
961
962 int uc_meta_data_get_string (value_list_t *vl,
963     const char *key,
964     char **value)
965   UC_WRAP(meta_data_get_string)
966 int uc_meta_data_get_signed_int (value_list_t *vl,
967     const char *key,
968     int64_t *value)
969   UC_WRAP(meta_data_get_signed_int)
970 int uc_meta_data_get_unsigned_int (value_list_t *vl,
971     const char *key,
972     uint64_t *value)
973   UC_WRAP(meta_data_get_unsigned_int)
974 int uc_meta_data_get_double (value_list_t *vl,
975     const char *key,
976     double *value)
977   UC_WRAP(meta_data_get_double)
978 int uc_meta_data_get_boolean (value_list_t *vl,
979     const char *key,
980     _Bool *value)
981   UC_WRAP(meta_data_get_boolean)
982 #undef UC_WRAP
983
984 /* vim: set sw=2 ts=8 sts=2 tw=78 : */