#include "collectd.h"
-#include "common.h"
#include "configfile.h"
#include "filter_chain.h"
#include "plugin.h"
-#include "utils_avltree.h"
+#include "utils/avltree/avltree.h"
+#include "utils/common/common.h"
+#include "utils/heap/heap.h"
#include "utils_cache.h"
#include "utils_complain.h"
-#include "utils_heap.h"
#include "utils_llist.h"
#include "utils_random.h"
#include "utils_time.h"
};
typedef struct read_func_s read_func_t;
+struct cache_event_func_s {
+ plugin_cache_event_cb callback;
+ char *name;
+ user_data_t user_data;
+ plugin_ctx_t plugin_ctx;
+};
+typedef struct cache_event_func_s cache_event_func_t;
+
struct write_queue_s;
typedef struct write_queue_s write_queue_t;
struct write_queue_s {
static llist_t *list_log;
static llist_t *list_notification;
+static size_t list_cache_event_num;
+static cache_event_func_t list_cache_event[32];
+
static fc_chain_t *pre_cache_chain;
static fc_chain_t *post_cache_chain;
static int register_callback(llist_t **list, /* {{{ */
const char *name, callback_func_t *cf) {
- llentry_t *le;
- char *key;
if (*list == NULL) {
*list = llist_create();
}
}
- key = strdup(name);
+ char *key = strdup(name);
if (key == NULL) {
ERROR("plugin: register_callback: strdup failed.");
destroy_callback(cf);
return -1;
}
- le = llist_search(*list, name);
+ llentry_t *le = llist_search(*list, name);
if (le == NULL) {
le = llentry_create(key, cf);
if (le == NULL) {
llist_append(*list, le);
} else {
- callback_func_t *old_cf;
-
- old_cf = le->value;
+ callback_func_t *old_cf = le->value;
le->value = cf;
P_WARNING("register_callback: "
cf->cf_callback = callback;
if (ud == NULL) {
cf->cf_udata = (user_data_t){
- .data = NULL, .free_func = NULL,
+ .data = NULL,
+ .free_func = NULL,
};
} else {
cf->cf_udata = *ud;
if (read_threads != NULL)
return;
- read_threads = (pthread_t *)calloc(num, sizeof(pthread_t));
+ read_threads = calloc(num, sizeof(*read_threads));
if (read_threads == NULL) {
ERROR("plugin: start_read_threads: calloc failed.");
return;
}
char name[THREAD_NAME_MAX];
- snprintf(name, sizeof(name), "reader#%" PRIu64, (uint64_t)read_threads_num);
+ ssnprintf(name, sizeof(name), "reader#%" PRIu64,
+ (uint64_t)read_threads_num);
set_thread_name(read_threads[read_threads_num], name);
read_threads_num++;
if (write_threads != NULL)
return;
- write_threads = (pthread_t *)calloc(num, sizeof(pthread_t));
+ write_threads = calloc(num, sizeof(*write_threads));
if (write_threads == NULL) {
ERROR("plugin: start_write_threads: calloc failed.");
return;
}
char name[THREAD_NAME_MAX];
- snprintf(name, sizeof(name), "writer#%" PRIu64,
- (uint64_t)write_threads_num);
+ ssnprintf(name, sizeof(name), "writer#%" PRIu64,
+ (uint64_t)write_threads_num);
set_thread_name(write_threads[write_threads_num], name);
write_threads_num++;
ERROR("plugin_set_dir: strdup(\"%s\") failed", dir);
}
-static bool plugin_is_loaded(char const *name) {
- int status;
-
+bool plugin_is_loaded(char const *name) {
if (plugins_loaded == NULL)
plugins_loaded =
c_avl_create((int (*)(const void *, const void *))strcasecmp);
assert(plugins_loaded != NULL);
- status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL);
+ int status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL);
return status == 0;
}
/* name = */ flush_name,
/* callback = */ plugin_flush_timeout_callback,
/* interval = */ ctx.flush_interval,
- /* user data = */ &(user_data_t){
- .data = cb, .free_func = plugin_flush_timeout_callback_free,
+ /* user data = */
+ &(user_data_t){
+ .data = cb,
+ .free_func = plugin_flush_timeout_callback_free,
});
sfree(flush_name);
return create_register_callback(&list_missing, name, (void *)callback, ud);
} /* int plugin_register_missing */
+EXPORT int plugin_register_cache_event(const char *name,
+ plugin_cache_event_cb callback,
+ user_data_t const *ud) {
+
+ if (name == NULL || callback == NULL)
+ return EINVAL;
+
+ char *name_copy = strdup(name);
+ if (name_copy == NULL) {
+ P_ERROR("plugin_register_cache_event: strdup failed.");
+ free_userdata(ud);
+ return ENOMEM;
+ }
+
+ if (list_cache_event_num >= 32) {
+ P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried "
+ "to be registered.");
+ free_userdata(ud);
+ return ENOMEM;
+ }
+
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ if (!cef->callback)
+ continue;
+
+ if (strcmp(name, cef->name) == 0) {
+ P_ERROR("plugin_register_cache_event: a callback named `%s' already "
+ "registered!",
+ name);
+ free_userdata(ud);
+ return -1;
+ }
+ }
+
+ user_data_t user_data;
+ if (ud == NULL) {
+ user_data = (user_data_t){
+ .data = NULL, .free_func = NULL,
+ };
+ } else {
+ user_data = *ud;
+ }
+
+ list_cache_event[list_cache_event_num] =
+ (cache_event_func_t){.callback = callback,
+ .name = name_copy,
+ .user_data = user_data,
+ .plugin_ctx = plugin_get_ctx()};
+ list_cache_event_num++;
+
+ return 0;
+} /* int plugin_register_cache_event */
+
EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) {
return create_register_callback(&list_shutdown, name, (void *)callback, NULL);
} /* int plugin_register_shutdown */
return plugin_unregister(list_missing, name);
}
+EXPORT int plugin_unregister_cache_event(const char *name) {
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ if (!cef->callback)
+ continue;
+ if (strcmp(name, cef->name) == 0) {
+ /* Mark callback as inactive, so mask in cache entries remains actual */
+ cef->callback = NULL;
+ sfree(cef->name);
+ free_userdata(&cef->user_data);
+ }
+ }
+ return 0;
+}
+
+static void destroy_cache_event_callbacks() {
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ if (!cef->callback)
+ continue;
+ cef->callback = NULL;
+ sfree(cef->name);
+ free_userdata(&cef->user_data);
+ }
+}
+
EXPORT int plugin_unregister_shutdown(const char *name) {
return plugin_unregister(list_shutdown, name);
}
* the data isn't freed twice. */
destroy_all_callbacks(&list_flush);
destroy_all_callbacks(&list_missing);
+ destroy_cache_event_callbacks();
destroy_all_callbacks(&list_write);
destroy_all_callbacks(&list_notification);
return 0;
} /* int }}} plugin_dispatch_missing */
+void plugin_dispatch_cache_event(enum cache_event_type_e event_type,
+ unsigned long callbacks_mask, const char *name,
+ const value_list_t *vl) {
+ switch (event_type) {
+ case CE_VALUE_NEW:
+ callbacks_mask = 0;
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ plugin_cache_event_cb callback = cef->callback;
+
+ if (!callback)
+ continue;
+
+ cache_event_t event = (cache_event_t){.type = event_type,
+ .value_list = vl,
+ .value_list_name = name,
+ .ret = 0};
+
+ plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
+ int status = (*callback)(&event, &cef->user_data);
+ plugin_set_ctx(old_ctx);
+
+ if (status != 0) {
+ ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
+ "%i for event NEW.",
+ cef->name, status);
+ } else {
+ if (event.ret) {
+ DEBUG(
+ "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.",
+ cef->name, name);
+ callbacks_mask |= (1 << (i));
+ } else {
+ DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.",
+ cef->name, name);
+ }
+ }
+ }
+
+ if (callbacks_mask)
+ uc_set_callbacks_mask(name, callbacks_mask);
+
+ break;
+ case CE_VALUE_UPDATE:
+ case CE_VALUE_EXPIRED:
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ plugin_cache_event_cb callback = cef->callback;
+
+ if (!callback)
+ continue;
+
+ if (callbacks_mask && (1 << (i)) == 0)
+ continue;
+
+ cache_event_t event = (cache_event_t){.type = event_type,
+ .value_list = vl,
+ .value_list_name = name,
+ .ret = 0};
+
+ plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
+ int status = (*callback)(&event, &cef->user_data);
+ plugin_set_ctx(old_ctx);
+
+ if (status != 0) {
+ ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
+ "%i for event %s.",
+ cef->name, status,
+ ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED"));
+ }
+ }
+ break;
+ }
+ return;
+}
+
static int plugin_dispatch_values_internal(value_list_t *vl) {
int status;
static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;