X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fdaemon%2Futils_cache.c;h=672b01f101e81af90a774b255a64bef2992d9669;hp=351c3a0fac2662e19d026f1d8032be76f0395b7f;hb=bf1c2612bc0405c895f754ebfbb24484122c7cfa;hpb=b4986d880987dd36f89cc866877a692111ca548e diff --git a/src/daemon/utils_cache.c b/src/daemon/utils_cache.c index 351c3a0f..672b01f1 100644 --- a/src/daemon/utils_cache.c +++ b/src/daemon/utils_cache.c @@ -67,6 +67,7 @@ typedef struct cache_entry_s { size_t history_length; meta_data_t *meta; + unsigned long callbacks_mask; } cache_entry_t; struct uc_iter_s { @@ -140,18 +141,15 @@ static void uc_check_range(const data_set_t *ds, cache_entry_t *ce) { static int uc_insert(const data_set_t *ds, const value_list_t *vl, const char *key) { - char *key_copy; - cache_entry_t *ce; - /* `cache_lock' has been locked by `uc_update' */ - key_copy = strdup(key); + char *key_copy = strdup(key); if (key_copy == NULL) { ERROR("uc_insert: strdup failed."); return -1; } - ce = cache_alloc(ds->ds_num); + cache_entry_t *ce = cache_alloc(ds->ds_num); if (ce == NULL) { sfree(key_copy); ERROR("uc_insert: cache_alloc (%" PRIsz ") failed.", ds->ds_num); @@ -230,6 +228,7 @@ int uc_check_timeout(void) { char *key; cdtime_t time; cdtime_t interval; + unsigned long callbacks_mask; } *expired = NULL; size_t expired_num = 0; @@ -255,6 +254,7 @@ int uc_check_timeout(void) { expired[expired_num].key = strdup(key); expired[expired_num].time = ce->last_time; expired[expired_num].interval = ce->interval; + expired[expired_num].callbacks_mask = ce->callbacks_mask; if (expired[expired_num].key == NULL) { ERROR("uc_check_timeout: strdup failed."); @@ -290,6 +290,10 @@ int uc_check_timeout(void) { } plugin_dispatch_missing(&vl); + + if (expired[i].callbacks_mask) + plugin_dispatch_cache_event(CE_VALUE_EXPIRED, expired[i].callbacks_mask, + expired[i].key, &vl); } /* for (i = 0; i < expired_num; i++) */ /* Now actually remove all the values from the cache. We don't re-evaluate @@ -319,8 +323,6 @@ int uc_check_timeout(void) { int uc_update(const data_set_t *ds, const value_list_t *vl) { char name[6 * DATA_MAX_NAME_LEN]; - cache_entry_t *ce = NULL; - int status; if (FORMAT_VL(name, sizeof(name), vl) != 0) { ERROR("uc_update: FORMAT_VL failed."); @@ -329,11 +331,16 @@ int uc_update(const data_set_t *ds, const value_list_t *vl) { pthread_mutex_lock(&cache_lock); - status = c_avl_get(cache_tree, name, (void *)&ce); + cache_entry_t *ce = NULL; + int status = c_avl_get(cache_tree, name, (void *)&ce); if (status != 0) /* entry does not yet exist */ { status = uc_insert(ds, vl, name); pthread_mutex_unlock(&cache_lock); + + if (status == 0) + plugin_dispatch_cache_event(CE_VALUE_NEW, 0 /* mask */, name, vl); + return status; } @@ -408,11 +415,32 @@ int uc_update(const data_set_t *ds, const value_list_t *vl) { ce->last_update = cdtime(); ce->interval = vl->interval; + /* Check if cache entry has registered callbacks */ + unsigned long callbacks_mask = ce->callbacks_mask; + pthread_mutex_unlock(&cache_lock); + if (callbacks_mask) + plugin_dispatch_cache_event(CE_VALUE_UPDATE, callbacks_mask, name, vl); + return 0; } /* int uc_update */ +int uc_set_callbacks_mask(const char *name, unsigned long mask) { + pthread_mutex_lock(&cache_lock); + cache_entry_t *ce = NULL; + int status = c_avl_get(cache_tree, name, (void *)&ce); + if (status != 0) { /* Ouch, just created entry disappeared ?! */ + ERROR("uc_set_callbacks_mask: Couldn't find %s entry!", name); + pthread_mutex_unlock(&cache_lock); + return -1; + } + DEBUG("uc_set_callbacks_mask: set mask for \"%s\" to %lu.", name, mask); + ce->callbacks_mask = mask; + pthread_mutex_unlock(&cache_lock); + return 0; +} + int uc_get_rate_by_name(const char *name, gauge_t **ret_values, size_t *ret_values_num) { gauge_t *ret = NULL;