* Sebastian Harl <sh at tokkee.org>
**/
+/* _GNU_SOURCE is needed in Linux to use pthread_setname_np */
+#define _GNU_SOURCE
+
#include "collectd.h"
#include "common.h"
#include "utils_random.h"
#include "utils_time.h"
-#include <ltdl.h>
+#if HAVE_PTHREAD_NP_H
+#include <pthread_np.h> /* for pthread_set_name_np(3) */
+#endif
+
+#include <dlfcn.h>
/*
* Private structures
static char *plugindir = NULL;
#ifndef DEFAULT_MAX_READ_INTERVAL
-#define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T(86400)
+#define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC(86400)
#endif
static c_heap_t *read_heap = NULL;
static llist_t *read_list;
static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
static pthread_t *read_threads = NULL;
-static int read_threads_num = 0;
+static size_t read_threads_num = 0;
static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
static write_queue_t *write_queue_head;
return (plugindir);
}
-static void plugin_update_internal_statistics(void) { /* {{{ */
- derive_t copy_write_queue_length;
- value_list_t vl = VALUE_LIST_INIT;
- value_t values[2];
-
- copy_write_queue_length = write_queue_length;
+static int plugin_update_internal_statistics(void) { /* {{{ */
+ gauge_t copy_write_queue_length = (gauge_t)write_queue_length;
/* Initialize `vl' */
- vl.values = values;
- vl.values_len = 2;
- vl.time = 0;
- sstrncpy(vl.host, hostname_g, sizeof(vl.host));
+ value_list_t vl = VALUE_LIST_INIT;
sstrncpy(vl.plugin, "collectd", sizeof(vl.plugin));
-
- vl.type_instance[0] = 0;
- vl.values_len = 1;
+ vl.interval = plugin_get_interval();
/* Write queue */
sstrncpy(vl.plugin_instance, "write_queue", sizeof(vl.plugin_instance));
/* Write queue : queue length */
- vl.values[0].gauge = (gauge_t)copy_write_queue_length;
+ vl.values = &(value_t){.gauge = copy_write_queue_length};
+ vl.values_len = 1;
sstrncpy(vl.type, "queue_length", sizeof(vl.type));
vl.type_instance[0] = 0;
plugin_dispatch_values(&vl);
/* Write queue : Values dropped (queue length > low limit) */
- vl.values[0].derive = (derive_t)stats_values_dropped;
+ vl.values = &(value_t){.gauge = (gauge_t)stats_values_dropped};
+ vl.values_len = 1;
sstrncpy(vl.type, "derive", sizeof(vl.type));
sstrncpy(vl.type_instance, "dropped", sizeof(vl.type_instance));
plugin_dispatch_values(&vl);
sstrncpy(vl.plugin_instance, "cache", sizeof(vl.plugin_instance));
/* Cache : Nb entry in cache tree */
- vl.values[0].gauge = (gauge_t)uc_get_size();
+ vl.values = &(value_t){.gauge = (gauge_t)uc_get_size()};
+ vl.values_len = 1;
sstrncpy(vl.type, "cache_size", sizeof(vl.type));
vl.type_instance[0] = 0;
plugin_dispatch_values(&vl);
- return;
-} /* }}} void plugin_update_internal_statistics */
+ return 0;
+} /* }}} int plugin_update_internal_statistics */
static void destroy_callback(callback_func_t *cf) /* {{{ */
{
* object, but it will bitch about a shared object not having a
* ``module_register'' symbol..
*/
-static int plugin_load_file(char *file, uint32_t flags) {
- lt_dlhandle dlh;
+static int plugin_load_file(const char *file, _Bool global) {
void (*reg_handle)(void);
- lt_dlinit();
- lt_dlerror(); /* clear errors */
+ int flags = RTLD_NOW;
+ if (global)
+ flags |= RTLD_GLOBAL;
-#if LIBTOOL_VERSION == 2
- if (flags & PLUGIN_FLAGS_GLOBAL) {
- lt_dladvise advise;
- lt_dladvise_init(&advise);
- lt_dladvise_global(&advise);
- dlh = lt_dlopenadvise(file, advise);
- lt_dladvise_destroy(&advise);
- } else {
- dlh = lt_dlopen(file);
- }
-#else /* if LIBTOOL_VERSION == 1 */
- if (flags & PLUGIN_FLAGS_GLOBAL)
- WARNING("plugin_load_file: The global flag is not supported, "
- "libtool 2 is required for this.");
- dlh = lt_dlopen(file);
-#endif
+ void *dlh = dlopen(file, flags);
if (dlh == NULL) {
char errbuf[1024] = "";
ssnprintf(errbuf, sizeof(errbuf),
- "lt_dlopen (\"%s\") failed: %s. "
+ "dlopen (\"%s\") failed: %s. "
"The most common cause for this problem is "
"missing dependencies. Use ldd(1) to check "
"the dependencies of the plugin "
"/ shared object.",
- file, lt_dlerror());
+ file, dlerror());
ERROR("%s", errbuf);
/* Make sure this is printed to STDERR in any case, but also
return (1);
}
- if ((reg_handle = (void (*)(void))lt_dlsym(dlh, "module_register")) == NULL) {
+ reg_handle = (void (*)(void))dlsym(dlh, "module_register");
+ if (reg_handle == NULL) {
WARNING("Couldn't find symbol \"module_register\" in \"%s\": %s\n", file,
- lt_dlerror());
- lt_dlclose(dlh);
+ dlerror());
+ dlclose(dlh);
return (-1);
}
* pthread_cond_timedwait returns. */
rc = 0;
while ((read_loop != 0) && (cdtime() < rf->rf_next_read) && rc == 0) {
- struct timespec ts = {0};
-
- CDTIME_T_TO_TIMESPEC(rf->rf_next_read, &ts);
-
- rc = pthread_cond_timedwait(&read_cond, &read_lock, &ts);
+ rc = pthread_cond_timedwait(&read_cond, &read_lock,
+ &CDTIME_T_TO_TIMESPEC(rf->rf_next_read));
}
/* Must hold `read_lock' when accessing `rf->rf_type'. */
return ((void *)0);
} /* void *plugin_read_thread */
-static void start_read_threads(int num) {
+#ifdef PTHREAD_MAX_NAMELEN_NP
+#define THREAD_NAME_MAX PTHREAD_MAX_NAMELEN_NP
+#else
+#define THREAD_NAME_MAX 16
+#endif
+
+static void set_thread_name(pthread_t tid, char const *name) {
+#if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP)
+
+ /* glibc limits the length of the name and fails if the passed string
+ * is too long, so we truncate it here. */
+ char n[THREAD_NAME_MAX];
+ if (strlen(name) >= THREAD_NAME_MAX)
+ WARNING("set_thread_name(\"%s\"): name too long", name);
+ sstrncpy(n, name, sizeof(n));
+
+#if defined(HAVE_PTHREAD_SETNAME_NP)
+ int status = pthread_setname_np(tid, n);
+ if (status != 0) {
+ char errbuf[1024];
+ ERROR("set_thread_name(\"%s\"): %s", n,
+ sstrerror(status, errbuf, sizeof(errbuf)));
+ }
+#else /* if defined(HAVE_PTHREAD_SET_NAME_NP) */
+ pthread_set_name_np(tid, n);
+#endif
+
+#endif
+}
+
+static void start_read_threads(size_t num) /* {{{ */
+{
if (read_threads != NULL)
return;
}
read_threads_num = 0;
- for (int i = 0; i < num; i++) {
- if (pthread_create(read_threads + read_threads_num, NULL,
- plugin_read_thread, NULL) == 0) {
- read_threads_num++;
- } else {
- ERROR("plugin: start_read_threads: pthread_create failed.");
+ for (size_t i = 0; i < num; i++) {
+ int status = pthread_create(read_threads + read_threads_num,
+ /* attr = */ NULL, plugin_read_thread,
+ /* arg = */ NULL);
+ if (status != 0) {
+ char errbuf[1024];
+ ERROR("plugin: start_read_threads: pthread_create failed "
+ "with status %i (%s).",
+ status, sstrerror(status, errbuf, sizeof(errbuf)));
return;
}
+
+ char name[THREAD_NAME_MAX];
+ ssnprintf(name, sizeof(name), "reader#%zu", read_threads_num);
+ set_thread_name(read_threads[read_threads_num], name);
+
+ read_threads_num++;
} /* for (i) */
-} /* void start_read_threads */
+} /* }}} void start_read_threads */
static void stop_read_threads(void) {
if (read_threads == NULL)
return;
- INFO("collectd: Stopping %i read threads.", read_threads_num);
+ INFO("collectd: Stopping %zu read threads.", read_threads_num);
pthread_mutex_lock(&read_lock);
read_loop = 0;
pthread_cond_broadcast(&read_cond);
pthread_mutex_unlock(&read_lock);
- for (int i = 0; i < read_threads_num; i++) {
+ for (size_t i = 0; i < read_threads_num; i++) {
if (pthread_join(read_threads[i], NULL) != 0) {
ERROR("plugin: stop_read_threads: pthread_join failed.");
}
return (NULL);
memcpy(vl, vl_orig, sizeof(*vl));
+ if (vl->host[0] == 0)
+ sstrncpy(vl->host, hostname_g, sizeof(vl->host));
+
vl->values = calloc(vl_orig->values_len, sizeof(*vl->values));
if (vl->values == NULL) {
plugin_value_list_free(vl);
write_threads_num = 0;
for (size_t i = 0; i < num; i++) {
- int status;
-
- status = pthread_create(write_threads + write_threads_num,
- /* attr = */ NULL, plugin_write_thread,
- /* arg = */ NULL);
+ int status = pthread_create(write_threads + write_threads_num,
+ /* attr = */ NULL, plugin_write_thread,
+ /* arg = */ NULL);
if (status != 0) {
char errbuf[1024];
ERROR("plugin: start_write_threads: pthread_create failed "
return;
}
+ char name[THREAD_NAME_MAX];
+ ssnprintf(name, sizeof(name), "writer#%zu", write_threads_num);
+ set_thread_name(write_threads[write_threads_num], name);
+
write_threads_num++;
} /* for (i) */
} /* }}} void start_write_threads */
}
#define BUFSIZE 512
-int plugin_load(char const *plugin_name, uint32_t flags) {
+int plugin_load(char const *plugin_name, _Bool global) {
DIR *dh;
const char *dir;
char filename[BUFSIZE] = "";
*/
if ((strcasecmp("perl", plugin_name) == 0) ||
(strcasecmp("python", plugin_name) == 0))
- flags |= PLUGIN_FLAGS_GLOBAL;
+ global = 1;
/* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
* type when matching the filename */
continue;
}
- status = plugin_load_file(filename, flags);
+ status = plugin_load_file(filename, global);
if (status == 0) {
/* success */
plugin_mark_loaded(plugin_name);
/* Init the value cache */
uc_init();
- if (IS_TRUE(global_option_get("CollectInternalStats")))
+ if (IS_TRUE(global_option_get("CollectInternalStats"))) {
record_statistics = 1;
+ plugin_register_read("collectd", plugin_update_internal_statistics);
+ }
chain_name = global_option_get("PreCacheChain");
pre_cache_chain = fc_chain_get_by_name(chain_name);
rt = global_option_get("ReadThreads");
num = atoi(rt);
if (num != -1)
- start_read_threads((num > 0) ? num : 5);
+ start_read_threads((num > 0) ? ((size_t)num) : 5);
}
return ret;
} /* void plugin_init_all */
/* TODO: Rename this function. */
void plugin_read_all(void) {
- if (record_statistics) {
- plugin_update_internal_statistics();
- }
uc_check_timeout();
return;
int status;
static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
- value_t *saved_values;
- int saved_values_len;
-
data_set_t *ds;
- int free_meta_data = 0;
+ _Bool free_meta_data = 0;
+
+ assert(vl != NULL);
- assert(vl);
- assert(vl->plugin);
+ /* These fields are initialized by plugin_value_list_clone() if needed: */
+ assert(vl->host[0] != 0);
+ assert(vl->time != 0); /* The time is determined at _enqueue_ time. */
+ assert(vl->interval != 0);
if (vl->type[0] == 0 || vl->values == NULL || vl->values_len < 1) {
ERROR("plugin_dispatch_values: Invalid value list "
return (-1);
}
- /* Assured by plugin_value_list_clone(). The time is determined at
- * _enqueue_ time. */
- assert(vl->time != 0);
- assert(vl->interval != 0);
-
DEBUG("plugin_dispatch_values: time = %.3f; interval = %.3f; "
"host = %s; "
"plugin = %s; plugin_instance = %s; "
escape_slashes(vl->type, sizeof(vl->type));
escape_slashes(vl->type_instance, sizeof(vl->type_instance));
- /* Copy the values. This way, we can assure `targets' that they get
- * dynamically allocated values, which they can free and replace if
- * they like. */
- if ((pre_cache_chain != NULL) || (post_cache_chain != NULL)) {
- saved_values = vl->values;
- saved_values_len = vl->values_len;
-
- vl->values = (value_t *)calloc(vl->values_len, sizeof(*vl->values));
- if (vl->values == NULL) {
- ERROR("plugin_dispatch_values: calloc failed.");
- vl->values = saved_values;
- return (-1);
- }
- memcpy(vl->values, saved_values, vl->values_len * sizeof(*vl->values));
- } else /* if ((pre == NULL) && (post == NULL)) */
- {
- saved_values = NULL;
- saved_values_len = 0;
- }
-
if (pre_cache_chain != NULL) {
status = fc_process_chain(ds, vl, pre_cache_chain);
if (status < 0) {
"pre-cache chain failed with "
"status %i (%#x).",
status, status);
- } else if (status == FC_TARGET_STOP) {
- /* Restore the state of the value_list so that plugins
- * don't get confused.. */
- if (saved_values != NULL) {
- sfree(vl->values);
- vl->values = saved_values;
- vl->values_len = saved_values_len;
- }
+ } else if (status == FC_TARGET_STOP)
return (0);
- }
}
/* Update the value cache */
} else
fc_default_action(ds, vl);
- /* Restore the state of the value_list so that plugins don't get
- * confused.. */
- if (saved_values != NULL) {
- sfree(vl->values);
- vl->values = saved_values;
- vl->values_len = saved_values_len;
- }
-
if ((free_meta_data != 0) && (vl->meta != NULL)) {
meta_data_destroy(vl->meta);
vl->meta = NULL;
} /* void *plugin_thread_start */
int plugin_thread_create(pthread_t *thread, const pthread_attr_t *attr,
- void *(*start_routine)(void *), void *arg) {
+ void *(*start_routine)(void *), void *arg,
+ char const *name) {
plugin_thread_t *plugin_thread;
plugin_thread = malloc(sizeof(*plugin_thread));
if (plugin_thread == NULL)
- return -1;
+ return ENOMEM;
plugin_thread->ctx = plugin_get_ctx();
plugin_thread->start_routine = start_routine;
plugin_thread->arg = arg;
- return pthread_create(thread, attr, plugin_thread_start, plugin_thread);
-} /* int plugin_thread_create */
+ int ret = pthread_create(thread, attr, plugin_thread_start, plugin_thread);
+ if (ret != 0) {
+ sfree(plugin_thread);
+ return ret;
+ }
+
+ if (name != NULL)
+ set_thread_name(*thread, name);
-/* vim: set sw=8 ts=8 noet fdm=marker : */
+ return 0;
+} /* int plugin_thread_create */