X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fdaemon%2Futils_cache.c;h=cf2095eec12fb0190ce30e187173ea6d7490d433;hp=ea7c3e33b4ab9e064b26e26ae6657d983dc605ea;hb=master;hpb=5dbb7471b0a7ca7506f56f4c6dbaf58e790c6b7c diff --git a/src/daemon/utils_cache.c b/src/daemon/utils_cache.c index ea7c3e33..cf2095ee 100644 --- a/src/daemon/utils_cache.c +++ b/src/daemon/utils_cache.c @@ -28,10 +28,10 @@ #include "collectd.h" -#include "common.h" -#include "meta_data.h" #include "plugin.h" -#include "utils_avltree.h" +#include "utils/avltree/avltree.h" +#include "utils/common/common.h" +#include "utils/metadata/meta_data.h" #include "utils_cache.h" #include @@ -67,6 +67,7 @@ typedef struct cache_entry_s { size_t history_length; meta_data_t *meta; + unsigned long callbacks_mask; } cache_entry_t; struct uc_iter_s { @@ -76,7 +77,7 @@ struct uc_iter_s { cache_entry_t *entry; }; -static c_avl_tree_t *cache_tree = NULL; +static c_avl_tree_t *cache_tree; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; static int cache_compare(const cache_entry_t *a, const cache_entry_t *b) { @@ -140,21 +141,18 @@ static void uc_check_range(const data_set_t *ds, cache_entry_t *ce) { static int uc_insert(const data_set_t *ds, const value_list_t *vl, const char *key) { - char *key_copy; - cache_entry_t *ce; - /* `cache_lock' has been locked by `uc_update' */ - key_copy = strdup(key); + char *key_copy = strdup(key); if (key_copy == NULL) { ERROR("uc_insert: strdup failed."); return -1; } - ce = cache_alloc(ds->ds_num); + cache_entry_t *ce = cache_alloc(ds->ds_num); if (ce == NULL) { sfree(key_copy); - ERROR("uc_insert: cache_alloc (%zu) failed.", ds->ds_num); + ERROR("uc_insert: cache_alloc (%" PRIsz ") failed.", ds->ds_num); return -1; } @@ -201,7 +199,11 @@ static int uc_insert(const data_set_t *ds, const value_list_t *vl, ce->last_time = vl->time; ce->last_update = cdtime(); ce->interval = vl->interval; - ce->state = STATE_OKAY; + ce->state = STATE_UNKNOWN; + + if (vl->meta != NULL) { + ce->meta = meta_data_clone(vl->meta); + } if (c_avl_insert(cache_tree, key_copy, ce) != 0) { sfree(key_copy); @@ -226,6 +228,7 @@ int uc_check_timeout(void) { char *key; cdtime_t time; cdtime_t interval; + unsigned long callbacks_mask; } *expired = NULL; size_t expired_num = 0; @@ -251,6 +254,7 @@ int uc_check_timeout(void) { expired[expired_num].key = strdup(key); expired[expired_num].time = ce->last_time; expired[expired_num].interval = ce->interval; + expired[expired_num].callbacks_mask = ce->callbacks_mask; if (expired[expired_num].key == NULL) { ERROR("uc_check_timeout: strdup failed."); @@ -275,7 +279,8 @@ int uc_check_timeout(void) { * plugin calls the cache interface. */ for (size_t i = 0; i < expired_num; i++) { value_list_t vl = { - .time = expired[i].time, .interval = expired[i].interval, + .time = expired[i].time, + .interval = expired[i].interval, }; if (parse_identifier_vl(expired[i].key, &vl) != 0) { @@ -285,6 +290,10 @@ int uc_check_timeout(void) { } plugin_dispatch_missing(&vl); + + if (expired[i].callbacks_mask) + plugin_dispatch_cache_event(CE_VALUE_EXPIRED, expired[i].callbacks_mask, + expired[i].key, &vl); } /* for (i = 0; i < expired_num; i++) */ /* Now actually remove all the values from the cache. We don't re-evaluate @@ -314,8 +323,6 @@ int uc_check_timeout(void) { int uc_update(const data_set_t *ds, const value_list_t *vl) { char name[6 * DATA_MAX_NAME_LEN]; - cache_entry_t *ce = NULL; - int status; if (FORMAT_VL(name, sizeof(name), vl) != 0) { ERROR("uc_update: FORMAT_VL failed."); @@ -324,11 +331,16 @@ int uc_update(const data_set_t *ds, const value_list_t *vl) { pthread_mutex_lock(&cache_lock); - status = c_avl_get(cache_tree, name, (void *)&ce); + cache_entry_t *ce = NULL; + int status = c_avl_get(cache_tree, name, (void *)&ce); if (status != 0) /* entry does not yet exist */ { status = uc_insert(ds, vl, name); pthread_mutex_unlock(&cache_lock); + + if (status == 0) + plugin_dispatch_cache_event(CE_VALUE_NEW, 0 /* mask */, name, vl); + return status; } @@ -381,7 +393,7 @@ int uc_update(const data_set_t *ds, const value_list_t *vl) { return -1; } /* switch (ds->ds[i].type) */ - DEBUG("uc_update: %s: ds[%zu] = %lf", name, i, ce->values_gauge[i]); + DEBUG("uc_update: %s: ds[%" PRIsz "] = %lf", name, i, ce->values_gauge[i]); } /* for (i) */ /* Update the history if it exists. */ @@ -403,11 +415,32 @@ int uc_update(const data_set_t *ds, const value_list_t *vl) { ce->last_update = cdtime(); ce->interval = vl->interval; + /* Check if cache entry has registered callbacks */ + unsigned long callbacks_mask = ce->callbacks_mask; + pthread_mutex_unlock(&cache_lock); + if (callbacks_mask) + plugin_dispatch_cache_event(CE_VALUE_UPDATE, callbacks_mask, name, vl); + return 0; } /* int uc_update */ +int uc_set_callbacks_mask(const char *name, unsigned long mask) { + pthread_mutex_lock(&cache_lock); + cache_entry_t *ce = NULL; + int status = c_avl_get(cache_tree, name, (void *)&ce); + if (status != 0) { /* Ouch, just created entry disappeared ?! */ + ERROR("uc_set_callbacks_mask: Couldn't find %s entry!", name); + pthread_mutex_unlock(&cache_lock); + return -1; + } + DEBUG("uc_set_callbacks_mask: set mask for \"%s\" to %lu.", name, mask); + ce->callbacks_mask = mask; + pthread_mutex_unlock(&cache_lock); + return 0; +} + int uc_get_rate_by_name(const char *name, gauge_t **ret_values, size_t *ret_values_num) { gauge_t *ret = NULL; @@ -469,8 +502,8 @@ gauge_t *uc_get_rate(const data_set_t *ds, const value_list_t *vl) { /* This is important - the caller has no other way of knowing how many * values are returned. */ if (ret_num != ds->ds_num) { - ERROR("utils_cache: uc_get_rate: ds[%s] has %zu values, " - "but uc_get_rate_by_name returned %zu.", + ERROR("utils_cache: uc_get_rate: ds[%s] has %" PRIsz " values, " + "but uc_get_rate_by_name returned %" PRIsz ".", ds->type, ds->ds_num, ret_num); sfree(ret); return NULL; @@ -488,7 +521,7 @@ int uc_get_value_by_name(const char *name, value_t **ret_values, pthread_mutex_lock(&cache_lock); - if (c_avl_get(cache_tree, name, (void *) &ce) == 0) { + if (c_avl_get(cache_tree, name, (void *)&ce) == 0) { assert(ce != NULL); /* remove missing values from getval */ @@ -504,8 +537,7 @@ int uc_get_value_by_name(const char *name, value_t **ret_values, memcpy(ret, ce->values_raw, ret_num * sizeof(value_t)); } } - } - else { + } else { DEBUG("utils_cache: uc_get_value_by_name: No such value: %s", name); status = -1; } @@ -537,10 +569,10 @@ value_t *uc_get_value(const data_set_t *ds, const value_list_t *vl) { /* This is important - the caller has no other way of knowing how many * values are returned. */ - if (ret_num != (size_t) ds->ds_num) { - ERROR("utils_cache: uc_get_value: ds[%s] has %zu values, " - "but uc_get_value_by_name returned %zu.", ds->type, ds->ds_num, - ret_num); + if (ret_num != (size_t)ds->ds_num) { + ERROR("utils_cache: uc_get_value: ds[%s] has %" PRIsz " values, " + "but uc_get_value_by_name returned %" PRIsz ".", + ds->type, ds->ds_num, ret_num); sfree(ret); return (NULL); } @@ -827,9 +859,7 @@ int uc_inc_hits(const data_set_t *ds, const value_list_t *vl, int step) { * Iterator interface */ uc_iter_t *uc_get_iterator(void) { - uc_iter_t *iter; - - iter = (uc_iter_t *)calloc(1, sizeof(*iter)); + uc_iter_t *iter = calloc(1, sizeof(*iter)); if (iter == NULL) return NULL; @@ -892,13 +922,12 @@ int uc_iterator_get_values(uc_iter_t *iter, value_t **ret_values, if ((iter == NULL) || (iter->entry == NULL) || (ret_values == NULL) || (ret_num == NULL)) return -1; - *ret_values = calloc(iter->entry->values_num, sizeof(*iter->entry->values_raw)); if (*ret_values == NULL) return -1; for (size_t i = 0; i < iter->entry->values_num; ++i) - *ret_values[i] = iter->entry->values_raw[i]; + (*ret_values)[i] = iter->entry->values_raw[i]; *ret_num = iter->entry->values_num; @@ -969,11 +998,17 @@ static meta_data_t *uc_get_meta(const value_list_t *vl) /* {{{ */ pthread_mutex_unlock(&cache_lock); \ return status; \ } -int uc_meta_data_exists(const value_list_t *vl, - const char *key) UC_WRAP(meta_data_exists) +int uc_meta_data_exists(const value_list_t *vl, const char *key) + UC_WRAP(meta_data_exists) + + int uc_meta_data_delete(const value_list_t *vl, const char *key) + UC_WRAP(meta_data_delete) + + /* The second argument is called `toc` in the API, but the macro expects + * `key`. */ + int uc_meta_data_toc(const value_list_t *vl, + char ***key) UC_WRAP(meta_data_toc) - int uc_meta_data_delete(const value_list_t *vl, - const char *key) UC_WRAP(meta_data_delete) #undef UC_WRAP /* We need a new version of this macro because the following functions take @@ -999,7 +1034,7 @@ int uc_meta_data_exists(const value_list_t *vl, const value_list_t *vl, const char *key, double value) UC_WRAP(meta_data_add_double) int uc_meta_data_add_boolean( const value_list_t *vl, const char *key, - _Bool value) UC_WRAP(meta_data_add_boolean) + bool value) UC_WRAP(meta_data_add_boolean) int uc_meta_data_get_string(const value_list_t *vl, const char *key, @@ -1015,6 +1050,6 @@ int uc_meta_data_exists(const value_list_t *vl, const char *key, double *value) UC_WRAP(meta_data_get_double) int uc_meta_data_get_boolean( const value_list_t *vl, - const char *key, _Bool *value) + const char *key, bool *value) UC_WRAP(meta_data_get_boolean) #undef UC_WRAP