#include "collectd.h"
-#include "common.h"
#include "configfile.h"
#include "filter_chain.h"
#include "plugin.h"
-#include "utils_avltree.h"
+#include "utils/avltree/avltree.h"
+#include "utils/common/common.h"
+#include "utils/heap/heap.h"
#include "utils_cache.h"
#include "utils_complain.h"
-#include "utils_heap.h"
#include "utils_llist.h"
#include "utils_random.h"
#include "utils_time.h"
+#ifdef WIN32
+#define EXPORT __declspec(dllexport)
+#include <sys/stat.h>
+#include <unistd.h>
+#else
+#define EXPORT
+#endif
+
#if HAVE_PTHREAD_NP_H
#include <pthread_np.h> /* for pthread_set_name_np(3) */
#endif
};
typedef struct read_func_s read_func_t;
+struct cache_event_func_s {
+ plugin_cache_event_cb callback;
+ char *name;
+ user_data_t user_data;
+ plugin_ctx_t plugin_ctx;
+};
+typedef struct cache_event_func_s cache_event_func_t;
+
struct write_queue_s;
typedef struct write_queue_s write_queue_t;
struct write_queue_s {
/*
* Private variables
*/
-static c_avl_tree_t *plugins_loaded = NULL;
+static c_avl_tree_t *plugins_loaded;
static llist_t *list_init;
static llist_t *list_write;
static llist_t *list_log;
static llist_t *list_notification;
-static fc_chain_t *pre_cache_chain = NULL;
-static fc_chain_t *post_cache_chain = NULL;
+static size_t list_cache_event_num;
+static cache_event_func_t list_cache_event[32];
+
+static fc_chain_t *pre_cache_chain;
+static fc_chain_t *post_cache_chain;
static c_avl_tree_t *data_sets;
-static char *plugindir = NULL;
+static char *plugindir;
#ifndef DEFAULT_MAX_READ_INTERVAL
#define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC(86400)
#endif
-static c_heap_t *read_heap = NULL;
+static c_heap_t *read_heap;
static llist_t *read_list;
static int read_loop = 1;
static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
-static pthread_t *read_threads = NULL;
-static size_t read_threads_num = 0;
+static pthread_t *read_threads;
+static size_t read_threads_num;
static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
static write_queue_t *write_queue_head;
static write_queue_t *write_queue_tail;
-static long write_queue_length = 0;
-static _Bool write_loop = 1;
+static long write_queue_length;
+static bool write_loop = true;
static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
-static pthread_t *write_threads = NULL;
-static size_t write_threads_num = 0;
+static pthread_t *write_threads;
+static size_t write_threads_num;
static pthread_key_t plugin_ctx_key;
-static _Bool plugin_ctx_key_initialized = 0;
+static bool plugin_ctx_key_initialized;
-static long write_limit_high = 0;
-static long write_limit_low = 0;
+static long write_limit_high;
+static long write_limit_low;
-static derive_t stats_values_dropped = 0;
-static _Bool record_statistics = 0;
+static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
+static derive_t stats_values_dropped;
+static bool record_statistics;
/*
* Static functions
static int register_callback(llist_t **list, /* {{{ */
const char *name, callback_func_t *cf) {
- llentry_t *le;
- char *key;
if (*list == NULL) {
*list = llist_create();
}
}
- key = strdup(name);
+ char *key = strdup(name);
if (key == NULL) {
ERROR("plugin: register_callback: strdup failed.");
destroy_callback(cf);
return -1;
}
- le = llist_search(*list, name);
+ llentry_t *le = llist_search(*list, name);
if (le == NULL) {
le = llentry_create(key, cf);
if (le == NULL) {
llist_append(*list, le);
} else {
- callback_func_t *old_cf;
-
- old_cf = le->value;
+ callback_func_t *old_cf = le->value;
le->value = cf;
- WARNING("plugin: register_callback: "
- "a callback named `%s' already exists - "
- "overwriting the old entry!",
- name);
+ P_WARNING("register_callback: "
+ "a callback named `%s' already exists - "
+ "overwriting the old entry!",
+ name);
destroy_callback(old_cf);
sfree(key);
int i;
llentry_t *le;
int n;
- char **keys;
n = llist_size(*list);
if (n == 0) {
- INFO("%s [none]", comment);
+ INFO("%s: [none]", comment);
return;
}
- keys = calloc(n, sizeof(char *));
-
+ char **keys = calloc(n, sizeof(*keys));
if (keys == NULL) {
ERROR("%s: failed to allocate memory for list of callbacks", comment);
-
return;
}
static int create_register_callback(llist_t **list, /* {{{ */
const char *name, void *callback,
user_data_t const *ud) {
- callback_func_t *cf;
- cf = calloc(1, sizeof(*cf));
+ if (name == NULL || callback == NULL)
+ return EINVAL;
+
+ callback_func_t *cf = calloc(1, sizeof(*cf));
if (cf == NULL) {
free_userdata(ud);
ERROR("plugin: create_register_callback: calloc failed.");
- return -1;
+ return ENOMEM;
}
cf->cf_callback = callback;
if (ud == NULL) {
- cf->cf_udata.data = NULL;
- cf->cf_udata.free_func = NULL;
+ cf->cf_udata = (user_data_t){
+ .data = NULL,
+ .free_func = NULL,
+ };
} else {
cf->cf_udata = *ud;
}
/* plugin_load_file loads the shared object "file" and calls its
* "module_register" function. Returns zero on success, non-zero otherwise. */
-static int plugin_load_file(char const *file, _Bool global) {
+static int plugin_load_file(char const *file, bool global) {
int flags = RTLD_NOW;
if (global)
flags |= RTLD_GLOBAL;
if (read_threads != NULL)
return;
- read_threads = (pthread_t *)calloc(num, sizeof(pthread_t));
+ read_threads = calloc(num, sizeof(*read_threads));
if (read_threads == NULL) {
ERROR("plugin: start_read_threads: calloc failed.");
return;
}
char name[THREAD_NAME_MAX];
- snprintf(name, sizeof(name), "reader#%" PRIsz, read_threads_num);
+ ssnprintf(name, sizeof(name), "reader#%" PRIu64,
+ (uint64_t)read_threads_num);
set_thread_name(read_threads[read_threads_num], name);
read_threads_num++;
vl->time = cdtime();
/* Fill in the interval from the thread context, if it is zero. */
- if (vl->interval == 0) {
- plugin_ctx_t ctx = plugin_get_ctx();
-
- if (ctx.interval != 0)
- vl->interval = ctx.interval;
- else {
- char name[6 * DATA_MAX_NAME_LEN];
- FORMAT_VL(name, sizeof(name), vl);
- ERROR("plugin_value_list_clone: Unable to determine "
- "interval from context for "
- "value list \"%s\". "
- "This indicates a broken plugin. "
- "Please report this problem to the "
- "collectd mailing list or at "
- "<http://collectd.org/bugs/>.",
- name);
- vl->interval = cf_get_default_interval();
- }
- }
+ if (vl->interval == 0)
+ vl->interval = plugin_get_interval();
return vl;
} /* }}} value_list_t *plugin_value_list_clone */
if (write_threads != NULL)
return;
- write_threads = (pthread_t *)calloc(num, sizeof(pthread_t));
+ write_threads = calloc(num, sizeof(*write_threads));
if (write_threads == NULL) {
ERROR("plugin: start_write_threads: calloc failed.");
return;
}
char name[THREAD_NAME_MAX];
- snprintf(name, sizeof(name), "writer#%" PRIsz, write_threads_num);
+ ssnprintf(name, sizeof(name), "writer#%" PRIu64,
+ (uint64_t)write_threads_num);
set_thread_name(write_threads[write_threads_num], name);
write_threads_num++;
INFO("collectd: Stopping %" PRIsz " write threads.", write_threads_num);
pthread_mutex_lock(&write_lock);
- write_loop = 0;
+ write_loop = false;
DEBUG("plugin: stop_write_threads: Signalling `write_cond'");
pthread_cond_broadcast(&write_cond);
pthread_mutex_unlock(&write_lock);
ERROR("plugin_set_dir: strdup(\"%s\") failed", dir);
}
-static _Bool plugin_is_loaded(char const *name) {
- int status;
-
+bool plugin_is_loaded(char const *name) {
if (plugins_loaded == NULL)
plugins_loaded =
c_avl_create((int (*)(const void *, const void *))strcasecmp);
assert(plugins_loaded != NULL);
- status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL);
+ int status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL);
return status == 0;
}
}
#define BUFSIZE 512
-int plugin_load(char const *plugin_name, _Bool global) {
+#ifdef WIN32
+#define SHLIB_SUFFIX ".dll"
+#else
+#define SHLIB_SUFFIX ".so"
+#endif
+int plugin_load(char const *plugin_name, bool global) {
DIR *dh;
const char *dir;
char filename[BUFSIZE] = "";
*/
if ((strcasecmp("perl", plugin_name) == 0) ||
(strcasecmp("python", plugin_name) == 0))
- global = 1;
+ global = true;
- /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
+ /* `cpu' should not match `cpufreq'. To solve this we add SHLIB_SUFFIX to the
* type when matching the filename */
- status = snprintf(typename, sizeof(typename), "%s.so", plugin_name);
+ status = snprintf(typename, sizeof(typename), "%s" SHLIB_SUFFIX, plugin_name);
if ((status < 0) || ((size_t)status >= sizeof(typename))) {
- WARNING("plugin_load: Filename too long: \"%s.so\"", plugin_name);
+ WARNING("plugin_load: Filename too long: \"%s" SHLIB_SUFFIX "\"",
+ plugin_name);
return -1;
}
/*
* The `register_*' functions follow
*/
-int plugin_register_config(const char *name,
- int (*callback)(const char *key, const char *val),
- const char **keys, int keys_num) {
+EXPORT int plugin_register_config(const char *name,
+ int (*callback)(const char *key,
+ const char *val),
+ const char **keys, int keys_num) {
cf_register(name, callback, keys, keys_num);
return 0;
} /* int plugin_register_config */
-int plugin_register_complex_config(const char *type,
- int (*callback)(oconfig_item_t *)) {
+EXPORT int plugin_register_complex_config(const char *type,
+ int (*callback)(oconfig_item_t *)) {
return cf_register_complex(type, callback);
} /* int plugin_register_complex_config */
-int plugin_register_init(const char *name, int (*callback)(void)) {
+EXPORT int plugin_register_init(const char *name, int (*callback)(void)) {
return create_register_callback(&list_init, name, (void *)callback, NULL);
} /* plugin_register_init */
le = llist_search(read_list, rf->rf_name);
if (le != NULL) {
pthread_mutex_unlock(&read_lock);
- WARNING("The read function \"%s\" is already registered. "
- "Check for duplicates in your configuration!",
- rf->rf_name);
+ P_WARNING("The read function \"%s\" is already registered. "
+ "Check for duplicates in your configuration!",
+ rf->rf_name);
return EINVAL;
}
return 0;
} /* int plugin_insert_read */
-int plugin_register_read(const char *name, int (*callback)(void)) {
+EXPORT int plugin_register_read(const char *name, int (*callback)(void)) {
read_func_t *rf;
int status;
rf->rf_name = strdup(name);
rf->rf_type = RF_SIMPLE;
rf->rf_interval = plugin_get_interval();
+ rf->rf_ctx.interval = rf->rf_interval;
status = plugin_insert_read(rf);
if (status != 0) {
return status;
} /* int plugin_register_read */
-int plugin_register_complex_read(const char *group, const char *name,
- plugin_read_cb callback, cdtime_t interval,
- user_data_t const *user_data) {
+EXPORT int plugin_register_complex_read(const char *group, const char *name,
+ plugin_read_cb callback,
+ cdtime_t interval,
+ user_data_t const *user_data) {
read_func_t *rf;
int status;
}
rf->rf_ctx = plugin_get_ctx();
+ rf->rf_ctx.interval = rf->rf_interval;
status = plugin_insert_read(rf);
if (status != 0) {
return status;
} /* int plugin_register_complex_read */
-int plugin_register_write(const char *name, plugin_write_cb callback,
- user_data_t const *ud) {
+EXPORT int plugin_register_write(const char *name, plugin_write_cb callback,
+ user_data_t const *ud) {
return create_register_callback(&list_write, name, (void *)callback, ud);
} /* int plugin_register_write */
return flush_name;
} /* static char *plugin_flush_callback_name */
-int plugin_register_flush(const char *name, plugin_flush_cb callback,
- user_data_t const *ud) {
+EXPORT int plugin_register_flush(const char *name, plugin_flush_cb callback,
+ user_data_t const *ud) {
int status;
plugin_ctx_t ctx = plugin_get_ctx();
/* name = */ flush_name,
/* callback = */ plugin_flush_timeout_callback,
/* interval = */ ctx.flush_interval,
- /* user data = */ &(user_data_t){
- .data = cb, .free_func = plugin_flush_timeout_callback_free,
+ /* user data = */
+ &(user_data_t){
+ .data = cb,
+ .free_func = plugin_flush_timeout_callback_free,
});
sfree(flush_name);
return 0;
} /* int plugin_register_flush */
-int plugin_register_missing(const char *name, plugin_missing_cb callback,
- user_data_t const *ud) {
+EXPORT int plugin_register_missing(const char *name, plugin_missing_cb callback,
+ user_data_t const *ud) {
return create_register_callback(&list_missing, name, (void *)callback, ud);
} /* int plugin_register_missing */
-int plugin_register_shutdown(const char *name, int (*callback)(void)) {
+EXPORT int plugin_register_cache_event(const char *name,
+ plugin_cache_event_cb callback,
+ user_data_t const *ud) {
+
+ if (name == NULL || callback == NULL)
+ return EINVAL;
+
+ char *name_copy = strdup(name);
+ if (name_copy == NULL) {
+ P_ERROR("plugin_register_cache_event: strdup failed.");
+ free_userdata(ud);
+ return ENOMEM;
+ }
+
+ if (list_cache_event_num >= 32) {
+ P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried "
+ "to be registered.");
+ free_userdata(ud);
+ return ENOMEM;
+ }
+
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ if (!cef->callback)
+ continue;
+
+ if (strcmp(name, cef->name) == 0) {
+ P_ERROR("plugin_register_cache_event: a callback named `%s' already "
+ "registered!",
+ name);
+ free_userdata(ud);
+ return -1;
+ }
+ }
+
+ user_data_t user_data;
+ if (ud == NULL) {
+ user_data = (user_data_t){
+ .data = NULL, .free_func = NULL,
+ };
+ } else {
+ user_data = *ud;
+ }
+
+ list_cache_event[list_cache_event_num] =
+ (cache_event_func_t){.callback = callback,
+ .name = name_copy,
+ .user_data = user_data,
+ .plugin_ctx = plugin_get_ctx()};
+ list_cache_event_num++;
+
+ return 0;
+} /* int plugin_register_cache_event */
+
+EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) {
return create_register_callback(&list_shutdown, name, (void *)callback, NULL);
} /* int plugin_register_shutdown */
data_sets = NULL;
} /* void plugin_free_data_sets */
-int plugin_register_data_set(const data_set_t *ds) {
+EXPORT int plugin_register_data_set(const data_set_t *ds) {
data_set_t *ds_copy;
if ((data_sets != NULL) && (c_avl_get(data_sets, ds->type, NULL) == 0)) {
return c_avl_insert(data_sets, (void *)ds_copy->type, (void *)ds_copy);
} /* int plugin_register_data_set */
-int plugin_register_log(const char *name, plugin_log_cb callback,
- user_data_t const *ud) {
+EXPORT int plugin_register_log(const char *name, plugin_log_cb callback,
+ user_data_t const *ud) {
return create_register_callback(&list_log, name, (void *)callback, ud);
} /* int plugin_register_log */
-int plugin_register_notification(const char *name,
- plugin_notification_cb callback,
- user_data_t const *ud) {
+EXPORT int plugin_register_notification(const char *name,
+ plugin_notification_cb callback,
+ user_data_t const *ud) {
return create_register_callback(&list_notification, name, (void *)callback,
ud);
} /* int plugin_register_log */
-int plugin_unregister_config(const char *name) {
+EXPORT int plugin_unregister_config(const char *name) {
cf_unregister(name);
return 0;
} /* int plugin_unregister_config */
-int plugin_unregister_complex_config(const char *name) {
+EXPORT int plugin_unregister_complex_config(const char *name) {
cf_unregister_complex(name);
return 0;
} /* int plugin_unregister_complex_config */
-int plugin_unregister_init(const char *name) {
+EXPORT int plugin_unregister_init(const char *name) {
return plugin_unregister(list_init, name);
}
-int plugin_unregister_read(const char *name) /* {{{ */
+EXPORT int plugin_unregister_read(const char *name) /* {{{ */
{
llentry_t *le;
read_func_t *rf;
return 0;
} /* }}} int plugin_unregister_read */
-void plugin_log_available_writers(void) {
+EXPORT void plugin_log_available_writers(void) {
log_list_callbacks(&list_write, "Available write targets:");
}
return strcmp(rf->rf_group, (const char *)group);
} /* }}} int compare_read_func_group */
-int plugin_unregister_read_group(const char *group) /* {{{ */
+EXPORT int plugin_unregister_read_group(const char *group) /* {{{ */
{
llentry_t *le;
read_func_t *rf;
return 0;
} /* }}} int plugin_unregister_read_group */
-int plugin_unregister_write(const char *name) {
+EXPORT int plugin_unregister_write(const char *name) {
return plugin_unregister(list_write, name);
}
-int plugin_unregister_flush(const char *name) {
+EXPORT int plugin_unregister_flush(const char *name) {
plugin_ctx_t ctx = plugin_get_ctx();
if (ctx.flush_interval != 0) {
return plugin_unregister(list_flush, name);
}
-int plugin_unregister_missing(const char *name) {
+EXPORT int plugin_unregister_missing(const char *name) {
return plugin_unregister(list_missing, name);
}
-int plugin_unregister_shutdown(const char *name) {
+EXPORT int plugin_unregister_cache_event(const char *name) {
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ if (!cef->callback)
+ continue;
+ if (strcmp(name, cef->name) == 0) {
+ /* Mark callback as inactive, so mask in cache entries remains actual */
+ cef->callback = NULL;
+ sfree(cef->name);
+ free_userdata(&cef->user_data);
+ }
+ }
+ return 0;
+}
+
+static void destroy_cache_event_callbacks() {
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ if (!cef->callback)
+ continue;
+ cef->callback = NULL;
+ sfree(cef->name);
+ free_userdata(&cef->user_data);
+ }
+}
+
+EXPORT int plugin_unregister_shutdown(const char *name) {
return plugin_unregister(list_shutdown, name);
}
-int plugin_unregister_data_set(const char *name) {
+EXPORT int plugin_unregister_data_set(const char *name) {
data_set_t *ds;
if (data_sets == NULL)
return 0;
} /* int plugin_unregister_data_set */
-int plugin_unregister_log(const char *name) {
+EXPORT int plugin_unregister_log(const char *name) {
return plugin_unregister(list_log, name);
}
-int plugin_unregister_notification(const char *name) {
+EXPORT int plugin_unregister_notification(const char *name) {
return plugin_unregister(list_notification, name);
}
-int plugin_init_all(void) {
+EXPORT int plugin_init_all(void) {
char const *chain_name;
llentry_t *le;
int status;
uc_init();
if (IS_TRUE(global_option_get("CollectInternalStats"))) {
- record_statistics = 1;
+ record_statistics = true;
plugin_register_read("collectd", plugin_update_internal_statistics);
}
} /* void plugin_init_all */
/* TODO: Rename this function. */
-void plugin_read_all(void) {
+EXPORT void plugin_read_all(void) {
uc_check_timeout();
return;
} /* void plugin_read_all */
/* Read function called when the `-T' command line argument is given. */
-int plugin_read_all_once(void) {
+EXPORT int plugin_read_all_once(void) {
int status;
int return_status = 0;
return return_status;
} /* int plugin_read_all_once */
-int plugin_write(const char *plugin, /* {{{ */
- const data_set_t *ds, const value_list_t *vl) {
+EXPORT int plugin_write(const char *plugin, /* {{{ */
+ const data_set_t *ds, const value_list_t *vl) {
llentry_t *le;
int status;
callback_func_t *cf = le->value;
plugin_write_cb callback;
- /* do not switch plugin context; rather keep the context (interval)
- * information of the calling read plugin */
+ /* Keep the read plugin's interval and flush information but update the
+ * plugin name. */
+ plugin_ctx_t old_ctx = plugin_get_ctx();
+ plugin_ctx_t ctx = old_ctx;
+ ctx.name = cf->cf_ctx.name;
+ plugin_set_ctx(ctx);
DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
callback = cf->cf_callback;
else
success++;
+ plugin_set_ctx(old_ctx);
le = le->next;
}
return status;
} /* }}} int plugin_write */
-int plugin_flush(const char *plugin, cdtime_t timeout, const char *identifier) {
+EXPORT int plugin_flush(const char *plugin, cdtime_t timeout,
+ const char *identifier) {
llentry_t *le;
if (list_flush == NULL)
return 0;
} /* int plugin_flush */
-int plugin_shutdown_all(void) {
+EXPORT int plugin_shutdown_all(void) {
llentry_t *le;
int ret = 0; // Assume success.
* the data isn't freed twice. */
destroy_all_callbacks(&list_flush);
destroy_all_callbacks(&list_missing);
+ destroy_cache_event_callbacks();
destroy_all_callbacks(&list_write);
destroy_all_callbacks(&list_notification);
return ret;
} /* void plugin_shutdown_all */
-int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */
+EXPORT int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */
{
if (list_missing == NULL)
return 0;
return 0;
} /* int }}} plugin_dispatch_missing */
+void plugin_dispatch_cache_event(enum cache_event_type_e event_type,
+ unsigned long callbacks_mask, const char *name,
+ const value_list_t *vl) {
+ switch (event_type) {
+ case CE_VALUE_NEW:
+ callbacks_mask = 0;
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ plugin_cache_event_cb callback = cef->callback;
+
+ if (!callback)
+ continue;
+
+ cache_event_t event = (cache_event_t){.type = event_type,
+ .value_list = vl,
+ .value_list_name = name,
+ .ret = 0};
+
+ plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
+ int status = (*callback)(&event, &cef->user_data);
+ plugin_set_ctx(old_ctx);
+
+ if (status != 0) {
+ ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
+ "%i for event NEW.",
+ cef->name, status);
+ } else {
+ if (event.ret) {
+ DEBUG(
+ "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.",
+ cef->name, name);
+ callbacks_mask |= (1 << (i));
+ } else {
+ DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.",
+ cef->name, name);
+ }
+ }
+ }
+
+ if (callbacks_mask)
+ uc_set_callbacks_mask(name, callbacks_mask);
+
+ break;
+ case CE_VALUE_UPDATE:
+ case CE_VALUE_EXPIRED:
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ plugin_cache_event_cb callback = cef->callback;
+
+ if (!callback)
+ continue;
+
+ if (callbacks_mask && (1 << (i)) == 0)
+ continue;
+
+ cache_event_t event = (cache_event_t){.type = event_type,
+ .value_list = vl,
+ .value_list_name = name,
+ .ret = 0};
+
+ plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
+ int status = (*callback)(&event, &cef->user_data);
+ plugin_set_ctx(old_ctx);
+
+ if (status != 0) {
+ ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
+ "%i for event %s.",
+ cef->name, status,
+ ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED"));
+ }
+ }
+ break;
+ }
+ return;
+}
+
static int plugin_dispatch_values_internal(value_list_t *vl) {
int status;
static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
- _Bool free_meta_data = 0;
+ bool free_meta_data = false;
assert(vl != NULL);
* this case matches and targets may add some and the calling function
* may not expect (and therefore free) that data. */
if (vl->meta == NULL)
- free_meta_data = 1;
+ free_meta_data = true;
if (list_write == NULL)
c_complain_once(LOG_WARNING, &no_write_complaint,
} else
fc_default_action(ds, vl);
- if ((free_meta_data != 0) && (vl->meta != NULL)) {
+ if ((free_meta_data == true) && (vl->meta != NULL)) {
meta_data_destroy(vl->meta);
vl->meta = NULL;
}
return (double)pos / (double)size;
} /* }}} double get_drop_probability */
-static _Bool check_drop_value(void) /* {{{ */
+static bool check_drop_value(void) /* {{{ */
{
- static cdtime_t last_message_time = 0;
+ static cdtime_t last_message_time;
static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
double p;
int status;
if (write_limit_high == 0)
- return 0;
+ return false;
p = get_drop_probability();
if (p == 0.0)
- return 0;
+ return false;
status = pthread_mutex_trylock(&last_message_lock);
if (status == 0) {
}
if (p == 1.0)
- return 1;
+ return true;
q = cdrand_d();
if (q > p)
- return 1;
+ return true;
else
- return 0;
-} /* }}} _Bool check_drop_value */
+ return false;
+} /* }}} bool check_drop_value */
-int plugin_dispatch_values(value_list_t const *vl) {
+EXPORT int plugin_dispatch_values(value_list_t const *vl) {
int status;
- static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
if (check_drop_value()) {
if (record_statistics) {
__attribute__((sentinel)) int
plugin_dispatch_multivalue(value_list_t const *template, /* {{{ */
- _Bool store_percentage, int store_type, ...) {
+ bool store_percentage, int store_type, ...) {
value_list_t *vl;
int failed = 0;
gauge_t sum = 0.0;
va_list ap;
+ if (check_drop_value()) {
+ if (record_statistics) {
+ pthread_mutex_lock(&statistics_lock);
+ stats_values_dropped++;
+ pthread_mutex_unlock(&statistics_lock);
+ }
+ return 0;
+ }
+
assert(template->values_len == 1);
/* Calculate sum for Gauge to calculate percent if needed */
return failed;
} /* }}} int plugin_dispatch_multivalue */
-int plugin_dispatch_notification(const notification_t *notif) {
+EXPORT int plugin_dispatch_notification(const notification_t *notif) {
llentry_t *le;
/* Possible TODO: Add flap detection here */
return 0;
} /* int plugin_dispatch_notification */
-void plugin_log(int level, const char *format, ...) {
+EXPORT void plugin_log(int level, const char *format, ...) {
char msg[1024];
va_list ap;
llentry_t *le;
}
} /* void plugin_log */
+void daemon_log(int level, const char *format, ...) {
+ char msg[1024] = ""; // Size inherits from plugin_log()
+
+ char const *name = plugin_get_ctx().name;
+ if (name == NULL)
+ name = "UNKNOWN";
+
+ va_list ap;
+ va_start(ap, format);
+ vsnprintf(msg, sizeof(msg), format, ap);
+ va_end(ap);
+
+ plugin_log(level, "%s plugin: %s", name, msg);
+} /* void daemon_log */
+
int parse_log_severity(const char *severity) {
int log_level = -1;
return log_level;
} /* int parse_log_severity */
-int parse_notif_severity(const char *severity) {
+EXPORT int parse_notif_severity(const char *severity) {
int notif_severity = -1;
if (strcasecmp(severity, "FAILURE") == 0)
return notif_severity;
} /* int parse_notif_severity */
-const data_set_t *plugin_get_ds(const char *name) {
+EXPORT const data_set_t *plugin_get_ds(const char *name) {
data_set_t *ds;
if (data_sets == NULL) {
- ERROR("plugin_get_ds: No data sets are defined yet.");
+ P_ERROR("plugin_get_ds: No data sets are defined yet.");
return NULL;
}
break;
}
case NM_TYPE_BOOLEAN: {
- meta->nm_value.nm_boolean = *((_Bool *)value);
+ meta->nm_value.nm_boolean = *((bool *)value);
break;
}
default: {
}
int plugin_notification_meta_add_boolean(notification_t *n, const char *name,
- _Bool value) {
+ bool value) {
return plugin_notification_meta_add(n, name, NM_TYPE_BOOLEAN, &value);
}
return ctx;
} /* int plugin_ctx_create */
-void plugin_init_ctx(void) {
+EXPORT void plugin_init_ctx(void) {
pthread_key_create(&plugin_ctx_key, plugin_ctx_destructor);
- plugin_ctx_key_initialized = 1;
+ plugin_ctx_key_initialized = true;
} /* void plugin_init_ctx */
-plugin_ctx_t plugin_get_ctx(void) {
+EXPORT plugin_ctx_t plugin_get_ctx(void) {
plugin_ctx_t *ctx;
assert(plugin_ctx_key_initialized);
return *ctx;
} /* plugin_ctx_t plugin_get_ctx */
-plugin_ctx_t plugin_set_ctx(plugin_ctx_t ctx) {
+EXPORT plugin_ctx_t plugin_set_ctx(plugin_ctx_t ctx) {
plugin_ctx_t *c;
plugin_ctx_t old;
return old;
} /* void plugin_set_ctx */
-cdtime_t plugin_get_interval(void) {
+EXPORT cdtime_t plugin_get_interval(void) {
cdtime_t interval;
interval = plugin_get_ctx().interval;
if (interval > 0)
return interval;
+ P_ERROR("plugin_get_interval: Unable to determine Interval from context.");
+
return cf_get_default_interval();
} /* cdtime_t plugin_get_interval */