X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fdaemon%2Fplugin.c;fp=src%2Fdaemon%2Fplugin.c;h=10a2064827086684bc742af5bc056ff797a7dce4;hp=b4e5ae7261677657029577f366540359e695567f;hb=64b9342a6c0ac420f3a01096fc319ce1d1ae8746;hpb=090afcd01854ef4af46064fe59c0bf71ec83d345 diff --git a/src/daemon/plugin.c b/src/daemon/plugin.c index b4e5ae72..10a20648 100644 --- a/src/daemon/plugin.c +++ b/src/daemon/plugin.c @@ -85,6 +85,14 @@ struct read_func_s { }; typedef struct read_func_s read_func_t; +struct cache_event_func_s { + plugin_cache_event_cb callback; + char *name; + user_data_t user_data; + plugin_ctx_t plugin_ctx; +}; +typedef struct cache_event_func_s cache_event_func_t; + struct write_queue_s; typedef struct write_queue_s write_queue_t; struct write_queue_s { @@ -112,6 +120,9 @@ static llist_t *list_shutdown; static llist_t *list_log; static llist_t *list_notification; +static size_t list_cache_event_num; +static cache_event_func_t list_cache_event[32]; + static fc_chain_t *pre_cache_chain; static fc_chain_t *post_cache_chain; @@ -263,8 +274,6 @@ static void destroy_read_heap(void) /* {{{ */ static int register_callback(llist_t **list, /* {{{ */ const char *name, callback_func_t *cf) { - llentry_t *le; - char *key; if (*list == NULL) { *list = llist_create(); @@ -276,14 +285,14 @@ static int register_callback(llist_t **list, /* {{{ */ } } - key = strdup(name); + char *key = strdup(name); if (key == NULL) { ERROR("plugin: register_callback: strdup failed."); destroy_callback(cf); return -1; } - le = llist_search(*list, name); + llentry_t *le = llist_search(*list, name); if (le == NULL) { le = llentry_create(key, cf); if (le == NULL) { @@ -296,9 +305,7 @@ static int register_callback(llist_t **list, /* {{{ */ llist_append(*list, le); } else { - callback_func_t *old_cf; - - old_cf = le->value; + callback_func_t *old_cf = le->value; le->value = cf; P_WARNING("register_callback: " @@ -1310,6 +1317,60 @@ EXPORT int plugin_register_missing(const char *name, plugin_missing_cb callback, return create_register_callback(&list_missing, name, (void *)callback, ud); } /* int plugin_register_missing */ +EXPORT int plugin_register_cache_event(const char *name, + plugin_cache_event_cb callback, + user_data_t const *ud) { + + if (name == NULL || callback == NULL) + return EINVAL; + + char *name_copy = strdup(name); + if (name_copy == NULL) { + P_ERROR("plugin_register_cache_event: strdup failed."); + free_userdata(ud); + return ENOMEM; + } + + if (list_cache_event_num >= 32) { + P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried " + "to be registered."); + free_userdata(ud); + return ENOMEM; + } + + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + + if (strcmp(name, cef->name) == 0) { + P_ERROR("plugin_register_cache_event: a callback named `%s' already " + "registered!", + name); + free_userdata(ud); + return -1; + } + } + + user_data_t user_data; + if (ud == NULL) { + user_data = (user_data_t){ + .data = NULL, .free_func = NULL, + }; + } else { + user_data = *ud; + } + + list_cache_event[list_cache_event_num] = + (cache_event_func_t){.callback = callback, + .name = name_copy, + .user_data = user_data, + .plugin_ctx = plugin_get_ctx()}; + list_cache_event_num++; + + return 0; +} /* int plugin_register_cache_event */ + EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) { return create_register_callback(&list_shutdown, name, (void *)callback, NULL); } /* int plugin_register_shutdown */ @@ -1511,6 +1572,32 @@ EXPORT int plugin_unregister_missing(const char *name) { return plugin_unregister(list_missing, name); } +EXPORT int plugin_unregister_cache_event(const char *name) { + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + if (strcmp(name, cef->name) == 0) { + /* Mark callback as inactive, so mask in cache entries remains actual */ + cef->callback = NULL; + sfree(cef->name); + free_userdata(&cef->user_data); + } + } + return 0; +} + +static void destroy_cache_event_callbacks() { + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + cef->callback = NULL; + sfree(cef->name); + free_userdata(&cef->user_data); + } +} + EXPORT int plugin_unregister_shutdown(const char *name) { return plugin_unregister(list_shutdown, name); } @@ -1855,6 +1942,7 @@ EXPORT int plugin_shutdown_all(void) { * the data isn't freed twice. */ destroy_all_callbacks(&list_flush); destroy_all_callbacks(&list_missing); + destroy_cache_event_callbacks(); destroy_all_callbacks(&list_write); destroy_all_callbacks(&list_notification); @@ -1895,6 +1983,82 @@ EXPORT int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */ return 0; } /* int }}} plugin_dispatch_missing */ +void plugin_dispatch_cache_event(enum cache_event_type_e event_type, + unsigned long callbacks_mask, const char *name, + const value_list_t *vl) { + switch (event_type) { + case CE_VALUE_NEW: + callbacks_mask = 0; + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + plugin_cache_event_cb callback = cef->callback; + + if (!callback) + continue; + + cache_event_t event = (cache_event_t){.type = event_type, + .value_list = vl, + .value_list_name = name, + .ret = 0}; + + plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx); + int status = (*callback)(&event, &cef->user_data); + plugin_set_ctx(old_ctx); + + if (status != 0) { + ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status " + "%i for event NEW.", + cef->name, status); + } else { + if (event.ret) { + DEBUG( + "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.", + cef->name, name); + callbacks_mask |= (1 << (i)); + } else { + DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.", + cef->name, name); + } + } + } + + if (callbacks_mask) + uc_set_callbacks_mask(name, callbacks_mask); + + break; + case CE_VALUE_UPDATE: + case CE_VALUE_EXPIRED: + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + plugin_cache_event_cb callback = cef->callback; + + if (!callback) + continue; + + if (callbacks_mask && (1 << (i)) == 0) + continue; + + cache_event_t event = (cache_event_t){.type = event_type, + .value_list = vl, + .value_list_name = name, + .ret = 0}; + + plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx); + int status = (*callback)(&event, &cef->user_data); + plugin_set_ctx(old_ctx); + + if (status != 0) { + ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status " + "%i for event %s.", + cef->name, status, + ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED")); + } + } + break; + } + return; +} + static int plugin_dispatch_values_internal(value_list_t *vl) { int status; static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;