X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fdaemon%2Fplugin.c;h=52cb0a4b7174b5344fa8a31c3082f11a9a17c6e9;hp=c2017acad204f582874b606cfcff61ef75ea3f47;hb=bf1c2612bc0405c895f754ebfbb24484122c7cfa;hpb=e414a5f4acfe3baf7aec0a9487bf6fceb929bf6b diff --git a/src/daemon/plugin.c b/src/daemon/plugin.c index c2017aca..52cb0a4b 100644 --- a/src/daemon/plugin.c +++ b/src/daemon/plugin.c @@ -30,14 +30,14 @@ #include "collectd.h" -#include "common.h" #include "configfile.h" #include "filter_chain.h" #include "plugin.h" -#include "utils_avltree.h" +#include "utils/avltree/avltree.h" +#include "utils/common/common.h" +#include "utils/heap/heap.h" #include "utils_cache.h" #include "utils_complain.h" -#include "utils_heap.h" #include "utils_llist.h" #include "utils_random.h" #include "utils_time.h" @@ -85,6 +85,14 @@ struct read_func_s { }; typedef struct read_func_s read_func_t; +struct cache_event_func_s { + plugin_cache_event_cb callback; + char *name; + user_data_t user_data; + plugin_ctx_t plugin_ctx; +}; +typedef struct cache_event_func_s cache_event_func_t; + struct write_queue_s; typedef struct write_queue_s write_queue_t; struct write_queue_s { @@ -112,6 +120,9 @@ static llist_t *list_shutdown; static llist_t *list_log; static llist_t *list_notification; +static size_t list_cache_event_num; +static cache_event_func_t list_cache_event[32]; + static fc_chain_t *pre_cache_chain; static fc_chain_t *post_cache_chain; @@ -263,8 +274,6 @@ static void destroy_read_heap(void) /* {{{ */ static int register_callback(llist_t **list, /* {{{ */ const char *name, callback_func_t *cf) { - llentry_t *le; - char *key; if (*list == NULL) { *list = llist_create(); @@ -276,14 +285,14 @@ static int register_callback(llist_t **list, /* {{{ */ } } - key = strdup(name); + char *key = strdup(name); if (key == NULL) { ERROR("plugin: register_callback: strdup failed."); destroy_callback(cf); return -1; } - le = llist_search(*list, name); + llentry_t *le = llist_search(*list, name); if (le == NULL) { le = llentry_create(key, cf); if (le == NULL) { @@ -296,9 +305,7 @@ static int register_callback(llist_t **list, /* {{{ */ llist_append(*list, le); } else { - callback_func_t *old_cf; - - old_cf = le->value; + callback_func_t *old_cf = le->value; le->value = cf; P_WARNING("register_callback: " @@ -320,7 +327,6 @@ static void log_list_callbacks(llist_t **list, /* {{{ */ int i; llentry_t *le; int n; - char **keys; n = llist_size(*list); if (n == 0) { @@ -328,11 +334,9 @@ static void log_list_callbacks(llist_t **list, /* {{{ */ return; } - keys = calloc(n, sizeof(char *)); - + char **keys = calloc(n, sizeof(*keys)); if (keys == NULL) { ERROR("%s: failed to allocate memory for list of callbacks", comment); - return; } @@ -369,7 +373,8 @@ static int create_register_callback(llist_t **list, /* {{{ */ cf->cf_callback = callback; if (ud == NULL) { cf->cf_udata = (user_data_t){ - .data = NULL, .free_func = NULL, + .data = NULL, + .free_func = NULL, }; } else { cf->cf_udata = *ud; @@ -630,7 +635,7 @@ static void start_read_threads(size_t num) /* {{{ */ if (read_threads != NULL) return; - read_threads = (pthread_t *)calloc(num, sizeof(pthread_t)); + read_threads = calloc(num, sizeof(*read_threads)); if (read_threads == NULL) { ERROR("plugin: start_read_threads: calloc failed."); return; @@ -649,7 +654,8 @@ static void start_read_threads(size_t num) /* {{{ */ } char name[THREAD_NAME_MAX]; - snprintf(name, sizeof(name), "reader#%" PRIu64, (uint64_t)read_threads_num); + ssnprintf(name, sizeof(name), "reader#%" PRIu64, + (uint64_t)read_threads_num); set_thread_name(read_threads[read_threads_num], name); read_threads_num++; @@ -819,7 +825,7 @@ static void start_write_threads(size_t num) /* {{{ */ if (write_threads != NULL) return; - write_threads = (pthread_t *)calloc(num, sizeof(pthread_t)); + write_threads = calloc(num, sizeof(*write_threads)); if (write_threads == NULL) { ERROR("plugin: start_write_threads: calloc failed."); return; @@ -838,8 +844,8 @@ static void start_write_threads(size_t num) /* {{{ */ } char name[THREAD_NAME_MAX]; - snprintf(name, sizeof(name), "writer#%" PRIu64, - (uint64_t)write_threads_num); + ssnprintf(name, sizeof(name), "writer#%" PRIu64, + (uint64_t)write_threads_num); set_thread_name(write_threads[write_threads_num], name); write_threads_num++; @@ -908,15 +914,13 @@ void plugin_set_dir(const char *dir) { ERROR("plugin_set_dir: strdup(\"%s\") failed", dir); } -static bool plugin_is_loaded(char const *name) { - int status; - +bool plugin_is_loaded(char const *name) { if (plugins_loaded == NULL) plugins_loaded = c_avl_create((int (*)(const void *, const void *))strcasecmp); assert(plugins_loaded != NULL); - status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL); + int status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL); return status == 0; } @@ -1297,8 +1301,10 @@ EXPORT int plugin_register_flush(const char *name, plugin_flush_cb callback, /* name = */ flush_name, /* callback = */ plugin_flush_timeout_callback, /* interval = */ ctx.flush_interval, - /* user data = */ &(user_data_t){ - .data = cb, .free_func = plugin_flush_timeout_callback_free, + /* user data = */ + &(user_data_t){ + .data = cb, + .free_func = plugin_flush_timeout_callback_free, }); sfree(flush_name); @@ -1313,6 +1319,60 @@ EXPORT int plugin_register_missing(const char *name, plugin_missing_cb callback, return create_register_callback(&list_missing, name, (void *)callback, ud); } /* int plugin_register_missing */ +EXPORT int plugin_register_cache_event(const char *name, + plugin_cache_event_cb callback, + user_data_t const *ud) { + + if (name == NULL || callback == NULL) + return EINVAL; + + char *name_copy = strdup(name); + if (name_copy == NULL) { + P_ERROR("plugin_register_cache_event: strdup failed."); + free_userdata(ud); + return ENOMEM; + } + + if (list_cache_event_num >= 32) { + P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried " + "to be registered."); + free_userdata(ud); + return ENOMEM; + } + + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + + if (strcmp(name, cef->name) == 0) { + P_ERROR("plugin_register_cache_event: a callback named `%s' already " + "registered!", + name); + free_userdata(ud); + return -1; + } + } + + user_data_t user_data; + if (ud == NULL) { + user_data = (user_data_t){ + .data = NULL, .free_func = NULL, + }; + } else { + user_data = *ud; + } + + list_cache_event[list_cache_event_num] = + (cache_event_func_t){.callback = callback, + .name = name_copy, + .user_data = user_data, + .plugin_ctx = plugin_get_ctx()}; + list_cache_event_num++; + + return 0; +} /* int plugin_register_cache_event */ + EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) { return create_register_callback(&list_shutdown, name, (void *)callback, NULL); } /* int plugin_register_shutdown */ @@ -1514,6 +1574,32 @@ EXPORT int plugin_unregister_missing(const char *name) { return plugin_unregister(list_missing, name); } +EXPORT int plugin_unregister_cache_event(const char *name) { + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + if (strcmp(name, cef->name) == 0) { + /* Mark callback as inactive, so mask in cache entries remains actual */ + cef->callback = NULL; + sfree(cef->name); + free_userdata(&cef->user_data); + } + } + return 0; +} + +static void destroy_cache_event_callbacks() { + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + cef->callback = NULL; + sfree(cef->name); + free_userdata(&cef->user_data); + } +} + EXPORT int plugin_unregister_shutdown(const char *name) { return plugin_unregister(list_shutdown, name); } @@ -1858,6 +1944,7 @@ EXPORT int plugin_shutdown_all(void) { * the data isn't freed twice. */ destroy_all_callbacks(&list_flush); destroy_all_callbacks(&list_missing); + destroy_cache_event_callbacks(); destroy_all_callbacks(&list_write); destroy_all_callbacks(&list_notification); @@ -1898,6 +1985,82 @@ EXPORT int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */ return 0; } /* int }}} plugin_dispatch_missing */ +void plugin_dispatch_cache_event(enum cache_event_type_e event_type, + unsigned long callbacks_mask, const char *name, + const value_list_t *vl) { + switch (event_type) { + case CE_VALUE_NEW: + callbacks_mask = 0; + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + plugin_cache_event_cb callback = cef->callback; + + if (!callback) + continue; + + cache_event_t event = (cache_event_t){.type = event_type, + .value_list = vl, + .value_list_name = name, + .ret = 0}; + + plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx); + int status = (*callback)(&event, &cef->user_data); + plugin_set_ctx(old_ctx); + + if (status != 0) { + ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status " + "%i for event NEW.", + cef->name, status); + } else { + if (event.ret) { + DEBUG( + "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.", + cef->name, name); + callbacks_mask |= (1 << (i)); + } else { + DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.", + cef->name, name); + } + } + } + + if (callbacks_mask) + uc_set_callbacks_mask(name, callbacks_mask); + + break; + case CE_VALUE_UPDATE: + case CE_VALUE_EXPIRED: + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + plugin_cache_event_cb callback = cef->callback; + + if (!callback) + continue; + + if (callbacks_mask && (1 << (i)) == 0) + continue; + + cache_event_t event = (cache_event_t){.type = event_type, + .value_list = vl, + .value_list_name = name, + .ret = 0}; + + plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx); + int status = (*callback)(&event, &cef->user_data); + plugin_set_ctx(old_ctx); + + if (status != 0) { + ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status " + "%i for event %s.", + cef->name, status, + ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED")); + } + } + break; + } + return; +} + static int plugin_dispatch_values_internal(value_list_t *vl) { int status; static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;