X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fdaemon%2Fplugin.c;h=52cb0a4b7174b5344fa8a31c3082f11a9a17c6e9;hp=9b75f698adff4f630a6114e2652e0ded5c882fac;hb=bf1c2612bc0405c895f754ebfbb24484122c7cfa;hpb=1159cb5d383c55a80a0db100b8f7aadcf44740a5 diff --git a/src/daemon/plugin.c b/src/daemon/plugin.c index 9b75f698..52cb0a4b 100644 --- a/src/daemon/plugin.c +++ b/src/daemon/plugin.c @@ -30,18 +30,26 @@ #include "collectd.h" -#include "common.h" #include "configfile.h" #include "filter_chain.h" #include "plugin.h" -#include "utils_avltree.h" +#include "utils/avltree/avltree.h" +#include "utils/common/common.h" +#include "utils/heap/heap.h" #include "utils_cache.h" #include "utils_complain.h" -#include "utils_heap.h" #include "utils_llist.h" #include "utils_random.h" #include "utils_time.h" +#ifdef WIN32 +#define EXPORT __declspec(dllexport) +#include +#include +#else +#define EXPORT +#endif + #if HAVE_PTHREAD_NP_H #include /* for pthread_set_name_np(3) */ #endif @@ -77,6 +85,14 @@ struct read_func_s { }; typedef struct read_func_s read_func_t; +struct cache_event_func_s { + plugin_cache_event_cb callback; + char *name; + user_data_t user_data; + plugin_ctx_t plugin_ctx; +}; +typedef struct cache_event_func_s cache_event_func_t; + struct write_queue_s; typedef struct write_queue_s write_queue_t; struct write_queue_s { @@ -94,7 +110,7 @@ typedef struct flush_callback_s flush_callback_t; /* * Private variables */ -static c_avl_tree_t *plugins_loaded = NULL; +static c_avl_tree_t *plugins_loaded; static llist_t *list_init; static llist_t *list_write; @@ -104,42 +120,46 @@ static llist_t *list_shutdown; static llist_t *list_log; static llist_t *list_notification; -static fc_chain_t *pre_cache_chain = NULL; -static fc_chain_t *post_cache_chain = NULL; +static size_t list_cache_event_num; +static cache_event_func_t list_cache_event[32]; + +static fc_chain_t *pre_cache_chain; +static fc_chain_t *post_cache_chain; static c_avl_tree_t *data_sets; -static char *plugindir = NULL; +static char *plugindir; #ifndef DEFAULT_MAX_READ_INTERVAL #define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC(86400) #endif -static c_heap_t *read_heap = NULL; +static c_heap_t *read_heap; static llist_t *read_list; static int read_loop = 1; static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; -static pthread_t *read_threads = NULL; -static size_t read_threads_num = 0; +static pthread_t *read_threads; +static size_t read_threads_num; static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL; static write_queue_t *write_queue_head; static write_queue_t *write_queue_tail; -static long write_queue_length = 0; -static _Bool write_loop = 1; +static long write_queue_length; +static bool write_loop = true; static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; -static pthread_t *write_threads = NULL; -static size_t write_threads_num = 0; +static pthread_t *write_threads; +static size_t write_threads_num; static pthread_key_t plugin_ctx_key; -static _Bool plugin_ctx_key_initialized = 0; +static bool plugin_ctx_key_initialized; -static long write_limit_high = 0; -static long write_limit_low = 0; +static long write_limit_high; +static long write_limit_low; -static derive_t stats_values_dropped = 0; -static _Bool record_statistics = 0; +static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER; +static derive_t stats_values_dropped; +static bool record_statistics; /* * Static functions @@ -254,8 +274,6 @@ static void destroy_read_heap(void) /* {{{ */ static int register_callback(llist_t **list, /* {{{ */ const char *name, callback_func_t *cf) { - llentry_t *le; - char *key; if (*list == NULL) { *list = llist_create(); @@ -267,14 +285,14 @@ static int register_callback(llist_t **list, /* {{{ */ } } - key = strdup(name); + char *key = strdup(name); if (key == NULL) { ERROR("plugin: register_callback: strdup failed."); destroy_callback(cf); return -1; } - le = llist_search(*list, name); + llentry_t *le = llist_search(*list, name); if (le == NULL) { le = llentry_create(key, cf); if (le == NULL) { @@ -287,15 +305,13 @@ static int register_callback(llist_t **list, /* {{{ */ llist_append(*list, le); } else { - callback_func_t *old_cf; - - old_cf = le->value; + callback_func_t *old_cf = le->value; le->value = cf; - WARNING("plugin: register_callback: " - "a callback named `%s' already exists - " - "overwriting the old entry!", - name); + P_WARNING("register_callback: " + "a callback named `%s' already exists - " + "overwriting the old entry!", + name); destroy_callback(old_cf); sfree(key); @@ -311,19 +327,16 @@ static void log_list_callbacks(llist_t **list, /* {{{ */ int i; llentry_t *le; int n; - char **keys; n = llist_size(*list); if (n == 0) { - INFO("%s [none]", comment); + INFO("%s: [none]", comment); return; } - keys = calloc(n, sizeof(char *)); - + char **keys = calloc(n, sizeof(*keys)); if (keys == NULL) { ERROR("%s: failed to allocate memory for list of callbacks", comment); - return; } @@ -346,19 +359,23 @@ static void log_list_callbacks(llist_t **list, /* {{{ */ static int create_register_callback(llist_t **list, /* {{{ */ const char *name, void *callback, user_data_t const *ud) { - callback_func_t *cf; - cf = calloc(1, sizeof(*cf)); + if (name == NULL || callback == NULL) + return EINVAL; + + callback_func_t *cf = calloc(1, sizeof(*cf)); if (cf == NULL) { free_userdata(ud); ERROR("plugin: create_register_callback: calloc failed."); - return -1; + return ENOMEM; } cf->cf_callback = callback; if (ud == NULL) { - cf->cf_udata.data = NULL; - cf->cf_udata.free_func = NULL; + cf->cf_udata = (user_data_t){ + .data = NULL, + .free_func = NULL, + }; } else { cf->cf_udata = *ud; } @@ -389,50 +406,44 @@ static int plugin_unregister(llist_t *list, const char *name) /* {{{ */ return 0; } /* }}} int plugin_unregister */ -/* - * (Try to) load the shared object `file'. Won't complain if it isn't a shared - * object, but it will bitch about a shared object not having a - * ``module_register'' symbol.. - */ -static int plugin_load_file(const char *file, _Bool global) { - void (*reg_handle)(void); - +/* plugin_load_file loads the shared object "file" and calls its + * "module_register" function. Returns zero on success, non-zero otherwise. */ +static int plugin_load_file(char const *file, bool global) { int flags = RTLD_NOW; if (global) flags |= RTLD_GLOBAL; void *dlh = dlopen(file, flags); - if (dlh == NULL) { char errbuf[1024] = ""; snprintf(errbuf, sizeof(errbuf), - "dlopen (\"%s\") failed: %s. " - "The most common cause for this problem is " - "missing dependencies. Use ldd(1) to check " - "the dependencies of the plugin " - "/ shared object.", + "dlopen(\"%s\") failed: %s. " + "The most common cause for this problem is missing dependencies. " + "Use ldd(1) to check the dependencies of the plugin / shared " + "object.", file, dlerror()); - ERROR("%s", errbuf); - /* Make sure this is printed to STDERR in any case, but also - * make sure it's printed only once. */ - if (list_log != NULL) - fprintf(stderr, "ERROR: %s\n", errbuf); + /* This error is printed to STDERR unconditionally. If list_log is NULL, + * plugin_log() will also print to STDERR. We avoid duplicate output by + * checking that the list of log handlers, list_log, is not NULL. */ + fprintf(stderr, "ERROR: %s\n", errbuf); + if (list_log != NULL) { + ERROR("%s", errbuf); + } - return 1; + return ENOENT; } - reg_handle = (void (*)(void))dlsym(dlh, "module_register"); + void (*reg_handle)(void) = dlsym(dlh, "module_register"); if (reg_handle == NULL) { - WARNING("Couldn't find symbol \"module_register\" in \"%s\": %s\n", file, - dlerror()); + ERROR("Couldn't find symbol \"module_register\" in \"%s\": %s\n", file, + dlerror()); dlclose(dlh); - return -1; + return ENOENT; } (*reg_handle)(); - return 0; } @@ -610,9 +621,7 @@ static void set_thread_name(pthread_t tid, char const *name) { #if defined(HAVE_PTHREAD_SETNAME_NP) int status = pthread_setname_np(tid, n); if (status != 0) { - char errbuf[1024]; - ERROR("set_thread_name(\"%s\"): %s", n, - sstrerror(status, errbuf, sizeof(errbuf))); + ERROR("set_thread_name(\"%s\"): %s", n, STRERROR(status)); } #else /* if defined(HAVE_PTHREAD_SET_NAME_NP) */ pthread_set_name_np(tid, n); @@ -626,7 +635,7 @@ static void start_read_threads(size_t num) /* {{{ */ if (read_threads != NULL) return; - read_threads = (pthread_t *)calloc(num, sizeof(pthread_t)); + read_threads = calloc(num, sizeof(*read_threads)); if (read_threads == NULL) { ERROR("plugin: start_read_threads: calloc failed."); return; @@ -638,15 +647,15 @@ static void start_read_threads(size_t num) /* {{{ */ /* attr = */ NULL, plugin_read_thread, /* arg = */ NULL); if (status != 0) { - char errbuf[1024]; - ERROR("plugin: start_read_threads: pthread_create failed " - "with status %i (%s).", - status, sstrerror(status, errbuf, sizeof(errbuf))); + ERROR("plugin: start_read_threads: pthread_create failed with status %i " + "(%s).", + status, STRERROR(status)); return; } char name[THREAD_NAME_MAX]; - snprintf(name, sizeof(name), "reader#%zu", read_threads_num); + ssnprintf(name, sizeof(name), "reader#%" PRIu64, + (uint64_t)read_threads_num); set_thread_name(read_threads[read_threads_num], name); read_threads_num++; @@ -657,7 +666,7 @@ static void stop_read_threads(void) { if (read_threads == NULL) return; - INFO("collectd: Stopping %zu read threads.", read_threads_num); + INFO("collectd: Stopping %" PRIsz " read threads.", read_threads_num); pthread_mutex_lock(&read_lock); read_loop = 0; @@ -719,25 +728,8 @@ plugin_value_list_clone(value_list_t const *vl_orig) /* {{{ */ vl->time = cdtime(); /* Fill in the interval from the thread context, if it is zero. */ - if (vl->interval == 0) { - plugin_ctx_t ctx = plugin_get_ctx(); - - if (ctx.interval != 0) - vl->interval = ctx.interval; - else { - char name[6 * DATA_MAX_NAME_LEN]; - FORMAT_VL(name, sizeof(name), vl); - ERROR("plugin_value_list_clone: Unable to determine " - "interval from context for " - "value list \"%s\". " - "This indicates a broken plugin. " - "Please report this problem to the " - "collectd mailing list or at " - ".", - name); - vl->interval = cf_get_default_interval(); - } - } + if (vl->interval == 0) + vl->interval = plugin_get_interval(); return vl; } /* }}} value_list_t *plugin_value_list_clone */ @@ -833,7 +825,7 @@ static void start_write_threads(size_t num) /* {{{ */ if (write_threads != NULL) return; - write_threads = (pthread_t *)calloc(num, sizeof(pthread_t)); + write_threads = calloc(num, sizeof(*write_threads)); if (write_threads == NULL) { ERROR("plugin: start_write_threads: calloc failed."); return; @@ -845,15 +837,15 @@ static void start_write_threads(size_t num) /* {{{ */ /* attr = */ NULL, plugin_write_thread, /* arg = */ NULL); if (status != 0) { - char errbuf[1024]; - ERROR("plugin: start_write_threads: pthread_create failed " - "with status %i (%s).", - status, sstrerror(status, errbuf, sizeof(errbuf))); + ERROR("plugin: start_write_threads: pthread_create failed with status %i " + "(%s).", + status, STRERROR(status)); return; } char name[THREAD_NAME_MAX]; - snprintf(name, sizeof(name), "writer#%zu", write_threads_num); + ssnprintf(name, sizeof(name), "writer#%" PRIu64, + (uint64_t)write_threads_num); set_thread_name(write_threads[write_threads_num], name); write_threads_num++; @@ -868,10 +860,10 @@ static void stop_write_threads(void) /* {{{ */ if (write_threads == NULL) return; - INFO("collectd: Stopping %zu write threads.", write_threads_num); + INFO("collectd: Stopping %" PRIsz " write threads.", write_threads_num); pthread_mutex_lock(&write_lock); - write_loop = 0; + write_loop = false; DEBUG("plugin: stop_write_threads: Signalling `write_cond'"); pthread_cond_broadcast(&write_cond); pthread_mutex_unlock(&write_lock); @@ -900,7 +892,7 @@ static void stop_write_threads(void) /* {{{ */ pthread_mutex_unlock(&write_lock); if (i > 0) { - WARNING("plugin: %zu value list%s left after shutting down " + WARNING("plugin: %" PRIsz " value list%s left after shutting down " "the write threads.", i, (i == 1) ? " was" : "s were"); } @@ -922,15 +914,13 @@ void plugin_set_dir(const char *dir) { ERROR("plugin_set_dir: strdup(\"%s\") failed", dir); } -static _Bool plugin_is_loaded(char const *name) { - int status; - +bool plugin_is_loaded(char const *name) { if (plugins_loaded == NULL) plugins_loaded = c_avl_create((int (*)(const void *, const void *))strcasecmp); assert(plugins_loaded != NULL); - status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL); + int status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL); return status == 0; } @@ -964,7 +954,12 @@ static void plugin_free_loaded(void) { } #define BUFSIZE 512 -int plugin_load(char const *plugin_name, _Bool global) { +#ifdef WIN32 +#define SHLIB_SUFFIX ".dll" +#else +#define SHLIB_SUFFIX ".so" +#endif +int plugin_load(char const *plugin_name, bool global) { DIR *dh; const char *dir; char filename[BUFSIZE] = ""; @@ -998,20 +993,19 @@ int plugin_load(char const *plugin_name, _Bool global) { */ if ((strcasecmp("perl", plugin_name) == 0) || (strcasecmp("python", plugin_name) == 0)) - global = 1; + global = true; - /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the + /* `cpu' should not match `cpufreq'. To solve this we add SHLIB_SUFFIX to the * type when matching the filename */ - status = snprintf(typename, sizeof(typename), "%s.so", plugin_name); + status = snprintf(typename, sizeof(typename), "%s" SHLIB_SUFFIX, plugin_name); if ((status < 0) || ((size_t)status >= sizeof(typename))) { - WARNING("plugin_load: Filename too long: \"%s.so\"", plugin_name); + WARNING("plugin_load: Filename too long: \"%s" SHLIB_SUFFIX "\"", + plugin_name); return -1; } if ((dh = opendir(dir)) == NULL) { - char errbuf[1024]; - ERROR("plugin_load: opendir (%s) failed: %s", dir, - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("plugin_load: opendir (%s) failed: %s", dir, STRERRNO); return -1; } @@ -1026,9 +1020,7 @@ int plugin_load(char const *plugin_name, _Bool global) { } if (lstat(filename, &statbuf) == -1) { - char errbuf[1024]; - WARNING("plugin_load: stat (\"%s\") failed: %s", filename, - sstrerror(errno, errbuf, sizeof(errbuf))); + WARNING("plugin_load: stat (\"%s\") failed: %s", filename, STRERRNO); continue; } else if (!S_ISREG(statbuf.st_mode)) { /* don't follow symlinks */ @@ -1061,19 +1053,20 @@ int plugin_load(char const *plugin_name, _Bool global) { /* * The `register_*' functions follow */ -int plugin_register_config(const char *name, - int (*callback)(const char *key, const char *val), - const char **keys, int keys_num) { +EXPORT int plugin_register_config(const char *name, + int (*callback)(const char *key, + const char *val), + const char **keys, int keys_num) { cf_register(name, callback, keys, keys_num); return 0; } /* int plugin_register_config */ -int plugin_register_complex_config(const char *type, - int (*callback)(oconfig_item_t *)) { +EXPORT int plugin_register_complex_config(const char *type, + int (*callback)(oconfig_item_t *)) { return cf_register_complex(type, callback); } /* int plugin_register_complex_config */ -int plugin_register_init(const char *name, int (*callback)(void)) { +EXPORT int plugin_register_init(const char *name, int (*callback)(void)) { return create_register_callback(&list_init, name, (void *)callback, NULL); } /* plugin_register_init */ @@ -1125,9 +1118,9 @@ static int plugin_insert_read(read_func_t *rf) { le = llist_search(read_list, rf->rf_name); if (le != NULL) { pthread_mutex_unlock(&read_lock); - WARNING("The read function \"%s\" is already registered. " - "Check for duplicates in your configuration!", - rf->rf_name); + P_WARNING("The read function \"%s\" is already registered. " + "Check for duplicates in your configuration!", + rf->rf_name); return EINVAL; } @@ -1155,7 +1148,7 @@ static int plugin_insert_read(read_func_t *rf) { return 0; } /* int plugin_insert_read */ -int plugin_register_read(const char *name, int (*callback)(void)) { +EXPORT int plugin_register_read(const char *name, int (*callback)(void)) { read_func_t *rf; int status; @@ -1173,6 +1166,7 @@ int plugin_register_read(const char *name, int (*callback)(void)) { rf->rf_name = strdup(name); rf->rf_type = RF_SIMPLE; rf->rf_interval = plugin_get_interval(); + rf->rf_ctx.interval = rf->rf_interval; status = plugin_insert_read(rf); if (status != 0) { @@ -1183,9 +1177,10 @@ int plugin_register_read(const char *name, int (*callback)(void)) { return status; } /* int plugin_register_read */ -int plugin_register_complex_read(const char *group, const char *name, - plugin_read_cb callback, cdtime_t interval, - user_data_t const *user_data) { +EXPORT int plugin_register_complex_read(const char *group, const char *name, + plugin_read_cb callback, + cdtime_t interval, + user_data_t const *user_data) { read_func_t *rf; int status; @@ -1214,6 +1209,7 @@ int plugin_register_complex_read(const char *group, const char *name, } rf->rf_ctx = plugin_get_ctx(); + rf->rf_ctx.interval = rf->rf_interval; status = plugin_insert_read(rf); if (status != 0) { @@ -1225,8 +1221,8 @@ int plugin_register_complex_read(const char *group, const char *name, return status; } /* int plugin_register_complex_read */ -int plugin_register_write(const char *name, plugin_write_cb callback, - user_data_t const *ud) { +EXPORT int plugin_register_write(const char *name, plugin_write_cb callback, + user_data_t const *ud) { return create_register_callback(&list_write, name, (void *)callback, ud); } /* int plugin_register_write */ @@ -1267,8 +1263,8 @@ static char *plugin_flush_callback_name(const char *name) { return flush_name; } /* static char *plugin_flush_callback_name */ -int plugin_register_flush(const char *name, plugin_flush_cb callback, - user_data_t const *ud) { +EXPORT int plugin_register_flush(const char *name, plugin_flush_cb callback, + user_data_t const *ud) { int status; plugin_ctx_t ctx = plugin_get_ctx(); @@ -1305,8 +1301,10 @@ int plugin_register_flush(const char *name, plugin_flush_cb callback, /* name = */ flush_name, /* callback = */ plugin_flush_timeout_callback, /* interval = */ ctx.flush_interval, - /* user data = */ &(user_data_t){ - .data = cb, .free_func = plugin_flush_timeout_callback_free, + /* user data = */ + &(user_data_t){ + .data = cb, + .free_func = plugin_flush_timeout_callback_free, }); sfree(flush_name); @@ -1316,12 +1314,66 @@ int plugin_register_flush(const char *name, plugin_flush_cb callback, return 0; } /* int plugin_register_flush */ -int plugin_register_missing(const char *name, plugin_missing_cb callback, - user_data_t const *ud) { +EXPORT int plugin_register_missing(const char *name, plugin_missing_cb callback, + user_data_t const *ud) { return create_register_callback(&list_missing, name, (void *)callback, ud); } /* int plugin_register_missing */ -int plugin_register_shutdown(const char *name, int (*callback)(void)) { +EXPORT int plugin_register_cache_event(const char *name, + plugin_cache_event_cb callback, + user_data_t const *ud) { + + if (name == NULL || callback == NULL) + return EINVAL; + + char *name_copy = strdup(name); + if (name_copy == NULL) { + P_ERROR("plugin_register_cache_event: strdup failed."); + free_userdata(ud); + return ENOMEM; + } + + if (list_cache_event_num >= 32) { + P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried " + "to be registered."); + free_userdata(ud); + return ENOMEM; + } + + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + + if (strcmp(name, cef->name) == 0) { + P_ERROR("plugin_register_cache_event: a callback named `%s' already " + "registered!", + name); + free_userdata(ud); + return -1; + } + } + + user_data_t user_data; + if (ud == NULL) { + user_data = (user_data_t){ + .data = NULL, .free_func = NULL, + }; + } else { + user_data = *ud; + } + + list_cache_event[list_cache_event_num] = + (cache_event_func_t){.callback = callback, + .name = name_copy, + .user_data = user_data, + .plugin_ctx = plugin_get_ctx()}; + list_cache_event_num++; + + return 0; +} /* int plugin_register_cache_event */ + +EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) { return create_register_callback(&list_shutdown, name, (void *)callback, NULL); } /* int plugin_register_shutdown */ @@ -1344,7 +1396,7 @@ static void plugin_free_data_sets(void) { data_sets = NULL; } /* void plugin_free_data_sets */ -int plugin_register_data_set(const data_set_t *ds) { +EXPORT int plugin_register_data_set(const data_set_t *ds) { data_set_t *ds_copy; if ((data_sets != NULL) && (c_avl_get(data_sets, ds->type, NULL) == 0)) { @@ -1373,33 +1425,33 @@ int plugin_register_data_set(const data_set_t *ds) { return c_avl_insert(data_sets, (void *)ds_copy->type, (void *)ds_copy); } /* int plugin_register_data_set */ -int plugin_register_log(const char *name, plugin_log_cb callback, - user_data_t const *ud) { +EXPORT int plugin_register_log(const char *name, plugin_log_cb callback, + user_data_t const *ud) { return create_register_callback(&list_log, name, (void *)callback, ud); } /* int plugin_register_log */ -int plugin_register_notification(const char *name, - plugin_notification_cb callback, - user_data_t const *ud) { +EXPORT int plugin_register_notification(const char *name, + plugin_notification_cb callback, + user_data_t const *ud) { return create_register_callback(&list_notification, name, (void *)callback, ud); } /* int plugin_register_log */ -int plugin_unregister_config(const char *name) { +EXPORT int plugin_unregister_config(const char *name) { cf_unregister(name); return 0; } /* int plugin_unregister_config */ -int plugin_unregister_complex_config(const char *name) { +EXPORT int plugin_unregister_complex_config(const char *name) { cf_unregister_complex(name); return 0; } /* int plugin_unregister_complex_config */ -int plugin_unregister_init(const char *name) { +EXPORT int plugin_unregister_init(const char *name) { return plugin_unregister(list_init, name); } -int plugin_unregister_read(const char *name) /* {{{ */ +EXPORT int plugin_unregister_read(const char *name) /* {{{ */ { llentry_t *le; read_func_t *rf; @@ -1436,7 +1488,7 @@ int plugin_unregister_read(const char *name) /* {{{ */ return 0; } /* }}} int plugin_unregister_read */ -void plugin_log_available_writers(void) { +EXPORT void plugin_log_available_writers(void) { log_list_callbacks(&list_write, "Available write targets:"); } @@ -1448,7 +1500,7 @@ static int compare_read_func_group(llentry_t *e, void *ud) /* {{{ */ return strcmp(rf->rf_group, (const char *)group); } /* }}} int compare_read_func_group */ -int plugin_unregister_read_group(const char *group) /* {{{ */ +EXPORT int plugin_unregister_read_group(const char *group) /* {{{ */ { llentry_t *le; read_func_t *rf; @@ -1498,11 +1550,11 @@ int plugin_unregister_read_group(const char *group) /* {{{ */ return 0; } /* }}} int plugin_unregister_read_group */ -int plugin_unregister_write(const char *name) { +EXPORT int plugin_unregister_write(const char *name) { return plugin_unregister(list_write, name); } -int plugin_unregister_flush(const char *name) { +EXPORT int plugin_unregister_flush(const char *name) { plugin_ctx_t ctx = plugin_get_ctx(); if (ctx.flush_interval != 0) { @@ -1518,15 +1570,41 @@ int plugin_unregister_flush(const char *name) { return plugin_unregister(list_flush, name); } -int plugin_unregister_missing(const char *name) { +EXPORT int plugin_unregister_missing(const char *name) { return plugin_unregister(list_missing, name); } -int plugin_unregister_shutdown(const char *name) { +EXPORT int plugin_unregister_cache_event(const char *name) { + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + if (strcmp(name, cef->name) == 0) { + /* Mark callback as inactive, so mask in cache entries remains actual */ + cef->callback = NULL; + sfree(cef->name); + free_userdata(&cef->user_data); + } + } + return 0; +} + +static void destroy_cache_event_callbacks() { + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + cef->callback = NULL; + sfree(cef->name); + free_userdata(&cef->user_data); + } +} + +EXPORT int plugin_unregister_shutdown(const char *name) { return plugin_unregister(list_shutdown, name); } -int plugin_unregister_data_set(const char *name) { +EXPORT int plugin_unregister_data_set(const char *name) { data_set_t *ds; if (data_sets == NULL) @@ -1541,15 +1619,15 @@ int plugin_unregister_data_set(const char *name) { return 0; } /* int plugin_unregister_data_set */ -int plugin_unregister_log(const char *name) { +EXPORT int plugin_unregister_log(const char *name) { return plugin_unregister(list_log, name); } -int plugin_unregister_notification(const char *name) { +EXPORT int plugin_unregister_notification(const char *name) { return plugin_unregister(list_notification, name); } -int plugin_init_all(void) { +EXPORT int plugin_init_all(void) { char const *chain_name; llentry_t *le; int status; @@ -1559,7 +1637,7 @@ int plugin_init_all(void) { uc_init(); if (IS_TRUE(global_option_get("CollectInternalStats"))) { - record_statistics = 1; + record_statistics = true; plugin_register_read("collectd", plugin_update_internal_statistics); } @@ -1648,14 +1726,14 @@ int plugin_init_all(void) { } /* void plugin_init_all */ /* TODO: Rename this function. */ -void plugin_read_all(void) { +EXPORT void plugin_read_all(void) { uc_check_timeout(); return; } /* void plugin_read_all */ /* Read function called when the `-T' command line argument is given. */ -int plugin_read_all_once(void) { +EXPORT int plugin_read_all_once(void) { int status; int return_status = 0; @@ -1700,8 +1778,8 @@ int plugin_read_all_once(void) { return return_status; } /* int plugin_read_all_once */ -int plugin_write(const char *plugin, /* {{{ */ - const data_set_t *ds, const value_list_t *vl) { +EXPORT int plugin_write(const char *plugin, /* {{{ */ + const data_set_t *ds, const value_list_t *vl) { llentry_t *le; int status; @@ -1728,8 +1806,12 @@ int plugin_write(const char *plugin, /* {{{ */ callback_func_t *cf = le->value; plugin_write_cb callback; - /* do not switch plugin context; rather keep the context (interval) - * information of the calling read plugin */ + /* Keep the read plugin's interval and flush information but update the + * plugin name. */ + plugin_ctx_t old_ctx = plugin_get_ctx(); + plugin_ctx_t ctx = old_ctx; + ctx.name = cf->cf_ctx.name; + plugin_set_ctx(ctx); DEBUG("plugin: plugin_write: Writing values via %s.", le->key); callback = cf->cf_callback; @@ -1739,6 +1821,7 @@ int plugin_write(const char *plugin, /* {{{ */ else success++; + plugin_set_ctx(old_ctx); le = le->next; } @@ -1775,7 +1858,8 @@ int plugin_write(const char *plugin, /* {{{ */ return status; } /* }}} int plugin_write */ -int plugin_flush(const char *plugin, cdtime_t timeout, const char *identifier) { +EXPORT int plugin_flush(const char *plugin, cdtime_t timeout, + const char *identifier) { llentry_t *le; if (list_flush == NULL) @@ -1805,7 +1889,7 @@ int plugin_flush(const char *plugin, cdtime_t timeout, const char *identifier) { return 0; } /* int plugin_flush */ -int plugin_shutdown_all(void) { +EXPORT int plugin_shutdown_all(void) { llentry_t *le; int ret = 0; // Assume success. @@ -1860,6 +1944,7 @@ int plugin_shutdown_all(void) { * the data isn't freed twice. */ destroy_all_callbacks(&list_flush); destroy_all_callbacks(&list_missing); + destroy_cache_event_callbacks(); destroy_all_callbacks(&list_write); destroy_all_callbacks(&list_notification); @@ -1871,7 +1956,7 @@ int plugin_shutdown_all(void) { return ret; } /* void plugin_shutdown_all */ -int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */ +EXPORT int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */ { if (list_missing == NULL) return 0; @@ -1900,11 +1985,87 @@ int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */ return 0; } /* int }}} plugin_dispatch_missing */ +void plugin_dispatch_cache_event(enum cache_event_type_e event_type, + unsigned long callbacks_mask, const char *name, + const value_list_t *vl) { + switch (event_type) { + case CE_VALUE_NEW: + callbacks_mask = 0; + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + plugin_cache_event_cb callback = cef->callback; + + if (!callback) + continue; + + cache_event_t event = (cache_event_t){.type = event_type, + .value_list = vl, + .value_list_name = name, + .ret = 0}; + + plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx); + int status = (*callback)(&event, &cef->user_data); + plugin_set_ctx(old_ctx); + + if (status != 0) { + ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status " + "%i for event NEW.", + cef->name, status); + } else { + if (event.ret) { + DEBUG( + "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.", + cef->name, name); + callbacks_mask |= (1 << (i)); + } else { + DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.", + cef->name, name); + } + } + } + + if (callbacks_mask) + uc_set_callbacks_mask(name, callbacks_mask); + + break; + case CE_VALUE_UPDATE: + case CE_VALUE_EXPIRED: + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + plugin_cache_event_cb callback = cef->callback; + + if (!callback) + continue; + + if (callbacks_mask && (1 << (i)) == 0) + continue; + + cache_event_t event = (cache_event_t){.type = event_type, + .value_list = vl, + .value_list_name = name, + .ret = 0}; + + plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx); + int status = (*callback)(&event, &cef->user_data); + plugin_set_ctx(old_ctx); + + if (status != 0) { + ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status " + "%i for event %s.", + cef->name, status, + ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED")); + } + } + break; + } + return; +} + static int plugin_dispatch_values_internal(value_list_t *vl) { int status; static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC; - _Bool free_meta_data = 0; + bool free_meta_data = false; assert(vl != NULL); @@ -1924,7 +2085,7 @@ static int plugin_dispatch_values_internal(value_list_t *vl) { * this case matches and targets may add some and the calling function * may not expect (and therefore free) that data. */ if (vl->meta == NULL) - free_meta_data = 1; + free_meta_data = true; if (list_write == NULL) c_complain_once(LOG_WARNING, &no_write_complaint, @@ -1970,8 +2131,8 @@ static int plugin_dispatch_values_internal(value_list_t *vl) { #else if (ds->ds_num != vl->values_len) { ERROR("plugin_dispatch_values: ds->type = %s: " - "(ds->ds_num = %zu) != " - "(vl->values_len = %zu)", + "(ds->ds_num = %" PRIsz ") != " + "(vl->values_len = %" PRIsz ")", ds->type, ds->ds_num, vl->values_len); return -1; } @@ -2008,7 +2169,7 @@ static int plugin_dispatch_values_internal(value_list_t *vl) { } else fc_default_action(ds, vl); - if ((free_meta_data != 0) && (vl->meta != NULL)) { + if ((free_meta_data == true) && (vl->meta != NULL)) { meta_data_destroy(vl->meta); vl->meta = NULL; } @@ -2037,9 +2198,9 @@ static double get_drop_probability(void) /* {{{ */ return (double)pos / (double)size; } /* }}} double get_drop_probability */ -static _Bool check_drop_value(void) /* {{{ */ +static bool check_drop_value(void) /* {{{ */ { - static cdtime_t last_message_time = 0; + static cdtime_t last_message_time; static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER; double p; @@ -2047,11 +2208,11 @@ static _Bool check_drop_value(void) /* {{{ */ int status; if (write_limit_high == 0) - return 0; + return false; p = get_drop_probability(); if (p == 0.0) - return 0; + return false; status = pthread_mutex_trylock(&last_message_lock); if (status == 0) { @@ -2068,18 +2229,17 @@ static _Bool check_drop_value(void) /* {{{ */ } if (p == 1.0) - return 1; + return true; q = cdrand_d(); if (q > p) - return 1; + return true; else - return 0; -} /* }}} _Bool check_drop_value */ + return false; +} /* }}} bool check_drop_value */ -int plugin_dispatch_values(value_list_t const *vl) { +EXPORT int plugin_dispatch_values(value_list_t const *vl) { int status; - static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER; if (check_drop_value()) { if (record_statistics) { @@ -2092,10 +2252,9 @@ int plugin_dispatch_values(value_list_t const *vl) { status = plugin_write_enqueue(vl); if (status != 0) { - char errbuf[1024]; - ERROR("plugin_dispatch_values: plugin_write_enqueue failed " - "with status %i (%s).", - status, sstrerror(status, errbuf, sizeof(errbuf))); + ERROR("plugin_dispatch_values: plugin_write_enqueue failed with status %i " + "(%s).", + status, STRERROR(status)); return status; } @@ -2104,12 +2263,21 @@ int plugin_dispatch_values(value_list_t const *vl) { __attribute__((sentinel)) int plugin_dispatch_multivalue(value_list_t const *template, /* {{{ */ - _Bool store_percentage, int store_type, ...) { + bool store_percentage, int store_type, ...) { value_list_t *vl; int failed = 0; gauge_t sum = 0.0; va_list ap; + if (check_drop_value()) { + if (record_statistics) { + pthread_mutex_lock(&statistics_lock); + stats_values_dropped++; + pthread_mutex_unlock(&statistics_lock); + } + return 0; + } + assert(template->values_len == 1); /* Calculate sum for Gauge to calculate percent if needed */ @@ -2177,7 +2345,7 @@ plugin_dispatch_multivalue(value_list_t const *template, /* {{{ */ return failed; } /* }}} int plugin_dispatch_multivalue */ -int plugin_dispatch_notification(const notification_t *notif) { +EXPORT int plugin_dispatch_notification(const notification_t *notif) { llentry_t *le; /* Possible TODO: Add flap detection here */ @@ -2214,7 +2382,7 @@ int plugin_dispatch_notification(const notification_t *notif) { return 0; } /* int plugin_dispatch_notification */ -void plugin_log(int level, const char *format, ...) { +EXPORT void plugin_log(int level, const char *format, ...) { char msg[1024]; va_list ap; llentry_t *le; @@ -2251,6 +2419,21 @@ void plugin_log(int level, const char *format, ...) { } } /* void plugin_log */ +void daemon_log(int level, const char *format, ...) { + char msg[1024] = ""; // Size inherits from plugin_log() + + char const *name = plugin_get_ctx().name; + if (name == NULL) + name = "UNKNOWN"; + + va_list ap; + va_start(ap, format); + vsnprintf(msg, sizeof(msg), format, ap); + va_end(ap); + + plugin_log(level, "%s plugin: %s", name, msg); +} /* void daemon_log */ + int parse_log_severity(const char *severity) { int log_level = -1; @@ -2272,7 +2455,7 @@ int parse_log_severity(const char *severity) { return log_level; } /* int parse_log_severity */ -int parse_notif_severity(const char *severity) { +EXPORT int parse_notif_severity(const char *severity) { int notif_severity = -1; if (strcasecmp(severity, "FAILURE") == 0) @@ -2286,11 +2469,11 @@ int parse_notif_severity(const char *severity) { return notif_severity; } /* int parse_notif_severity */ -const data_set_t *plugin_get_ds(const char *name) { +EXPORT const data_set_t *plugin_get_ds(const char *name) { data_set_t *ds; if (data_sets == NULL) { - ERROR("plugin_get_ds: No data sets are defined yet."); + P_ERROR("plugin_get_ds: No data sets are defined yet."); return NULL; } @@ -2345,7 +2528,7 @@ static int plugin_notification_meta_add(notification_t *n, const char *name, break; } case NM_TYPE_BOOLEAN: { - meta->nm_value.nm_boolean = *((_Bool *)value); + meta->nm_value.nm_boolean = *((bool *)value); break; } default: { @@ -2390,7 +2573,7 @@ int plugin_notification_meta_add_double(notification_t *n, const char *name, } int plugin_notification_meta_add_boolean(notification_t *n, const char *name, - _Bool value) { + bool value) { return plugin_notification_meta_add(n, name, NM_TYPE_BOOLEAN, &value); } @@ -2462,9 +2645,7 @@ static plugin_ctx_t *plugin_ctx_create(void) { ctx = malloc(sizeof(*ctx)); if (ctx == NULL) { - char errbuf[1024]; - ERROR("Failed to allocate plugin context: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("Failed to allocate plugin context: %s", STRERRNO); return NULL; } @@ -2475,12 +2656,12 @@ static plugin_ctx_t *plugin_ctx_create(void) { return ctx; } /* int plugin_ctx_create */ -void plugin_init_ctx(void) { +EXPORT void plugin_init_ctx(void) { pthread_key_create(&plugin_ctx_key, plugin_ctx_destructor); - plugin_ctx_key_initialized = 1; + plugin_ctx_key_initialized = true; } /* void plugin_init_ctx */ -plugin_ctx_t plugin_get_ctx(void) { +EXPORT plugin_ctx_t plugin_get_ctx(void) { plugin_ctx_t *ctx; assert(plugin_ctx_key_initialized); @@ -2496,7 +2677,7 @@ plugin_ctx_t plugin_get_ctx(void) { return *ctx; } /* plugin_ctx_t plugin_get_ctx */ -plugin_ctx_t plugin_set_ctx(plugin_ctx_t ctx) { +EXPORT plugin_ctx_t plugin_set_ctx(plugin_ctx_t ctx) { plugin_ctx_t *c; plugin_ctx_t old; @@ -2516,13 +2697,15 @@ plugin_ctx_t plugin_set_ctx(plugin_ctx_t ctx) { return old; } /* void plugin_set_ctx */ -cdtime_t plugin_get_interval(void) { +EXPORT cdtime_t plugin_get_interval(void) { cdtime_t interval; interval = plugin_get_ctx().interval; if (interval > 0) return interval; + P_ERROR("plugin_get_interval: Unable to determine Interval from context."); + return cf_get_default_interval(); } /* cdtime_t plugin_get_interval */