X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fdaemon%2Fplugin.c;h=c8fc15a14866c9982b9ba3b6f9000b959545a2cb;hb=295947714f23935be771c98f1071564d5567d33a;hp=7a503c718d13d37c2a18b75f95236837f513f106;hpb=79963d13c1884d1d92667cc502ad20758b084a12;p=collectd.git diff --git a/src/daemon/plugin.c b/src/daemon/plugin.c index 7a503c71..c8fc15a1 100644 --- a/src/daemon/plugin.c +++ b/src/daemon/plugin.c @@ -25,6 +25,9 @@ * Sebastian Harl **/ +/* _GNU_SOURCE is needed in Linux to use pthread_setname_np */ +#define _GNU_SOURCE + #include "collectd.h" #include "common.h" @@ -39,7 +42,11 @@ #include "utils_random.h" #include "utils_time.h" -#include +#if HAVE_PTHREAD_NP_H +#include /* for pthread_set_name_np(3) */ +#endif + +#include /* * Private structures @@ -105,7 +112,7 @@ static c_avl_tree_t *data_sets; static char *plugindir = NULL; #ifndef DEFAULT_MAX_READ_INTERVAL -#define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T(86400) +#define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC(86400) #endif static c_heap_t *read_heap = NULL; static llist_t *read_list; @@ -113,7 +120,7 @@ static int read_loop = 1; static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; static pthread_t *read_threads = NULL; -static int read_threads_num = 0; +static size_t read_threads_num = 0; static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL; static write_queue_t *write_queue_head; @@ -147,33 +154,27 @@ static const char *plugin_get_dir(void) { } static void plugin_update_internal_statistics(void) { /* {{{ */ - derive_t copy_write_queue_length; - value_list_t vl = VALUE_LIST_INIT; - value_t values[2]; - copy_write_queue_length = write_queue_length; + gauge_t copy_write_queue_length = (gauge_t)write_queue_length; /* Initialize `vl' */ - vl.values = values; - vl.values_len = 2; - vl.time = 0; + value_list_t vl = VALUE_LIST_INIT; sstrncpy(vl.host, hostname_g, sizeof(vl.host)); sstrncpy(vl.plugin, "collectd", sizeof(vl.plugin)); - vl.type_instance[0] = 0; - vl.values_len = 1; - /* Write queue */ sstrncpy(vl.plugin_instance, "write_queue", sizeof(vl.plugin_instance)); /* Write queue : queue length */ - vl.values[0].gauge = (gauge_t)copy_write_queue_length; + vl.values = &(value_t){.gauge = copy_write_queue_length}; + vl.values_len = 1; sstrncpy(vl.type, "queue_length", sizeof(vl.type)); vl.type_instance[0] = 0; plugin_dispatch_values(&vl); /* Write queue : Values dropped (queue length > low limit) */ - vl.values[0].derive = (derive_t)stats_values_dropped; + vl.values = &(value_t){.gauge = (gauge_t)stats_values_dropped}; + vl.values_len = 1; sstrncpy(vl.type, "derive", sizeof(vl.type)); sstrncpy(vl.type_instance, "dropped", sizeof(vl.type_instance)); plugin_dispatch_values(&vl); @@ -182,7 +183,8 @@ static void plugin_update_internal_statistics(void) { /* {{{ */ sstrncpy(vl.plugin_instance, "cache", sizeof(vl.plugin_instance)); /* Cache : Nb entry in cache tree */ - vl.values[0].gauge = (gauge_t)uc_get_size(); + vl.values = &(value_t){.gauge = (gauge_t)uc_get_size()}; + vl.values_len = 1; sstrncpy(vl.type, "cache_size", sizeof(vl.type)); vl.type_instance[0] = 0; plugin_dispatch_values(&vl); @@ -387,40 +389,25 @@ static int plugin_unregister(llist_t *list, const char *name) /* {{{ */ * object, but it will bitch about a shared object not having a * ``module_register'' symbol.. */ -static int plugin_load_file(char *file, uint32_t flags) { - lt_dlhandle dlh; +static int plugin_load_file(const char *file, _Bool global) { void (*reg_handle)(void); - lt_dlinit(); - lt_dlerror(); /* clear errors */ + int flags = RTLD_NOW; + if (global) + flags |= RTLD_GLOBAL; -#if LIBTOOL_VERSION == 2 - if (flags & PLUGIN_FLAGS_GLOBAL) { - lt_dladvise advise; - lt_dladvise_init(&advise); - lt_dladvise_global(&advise); - dlh = lt_dlopenadvise(file, advise); - lt_dladvise_destroy(&advise); - } else { - dlh = lt_dlopen(file); - } -#else /* if LIBTOOL_VERSION == 1 */ - if (flags & PLUGIN_FLAGS_GLOBAL) - WARNING("plugin_load_file: The global flag is not supported, " - "libtool 2 is required for this."); - dlh = lt_dlopen(file); -#endif + void *dlh = dlopen(file, flags); if (dlh == NULL) { char errbuf[1024] = ""; ssnprintf(errbuf, sizeof(errbuf), - "lt_dlopen (\"%s\") failed: %s. " + "dlopen (\"%s\") failed: %s. " "The most common cause for this problem is " "missing dependencies. Use ldd(1) to check " "the dependencies of the plugin " "/ shared object.", - file, lt_dlerror()); + file, dlerror()); ERROR("%s", errbuf); /* Make sure this is printed to STDERR in any case, but also @@ -431,10 +418,11 @@ static int plugin_load_file(char *file, uint32_t flags) { return (1); } - if ((reg_handle = (void (*)(void))lt_dlsym(dlh, "module_register")) == NULL) { + reg_handle = (void (*)(void))dlsym(dlh, "module_register"); + if (reg_handle == NULL) { WARNING("Couldn't find symbol \"module_register\" in \"%s\": %s\n", file, - lt_dlerror()); - lt_dlclose(dlh); + dlerror()); + dlclose(dlh); return (-1); } @@ -486,11 +474,8 @@ static void *plugin_read_thread(void __attribute__((unused)) * args) { * pthread_cond_timedwait returns. */ rc = 0; while ((read_loop != 0) && (cdtime() < rf->rf_next_read) && rc == 0) { - struct timespec ts = {0}; - - CDTIME_T_TO_TIMESPEC(rf->rf_next_read, &ts); - - rc = pthread_cond_timedwait(&read_cond, &read_lock, &ts); + rc = pthread_cond_timedwait(&read_cond, &read_lock, + &CDTIME_T_TO_TIMESPEC(rf->rf_next_read)); } /* Must hold `read_lock' when accessing `rf->rf_type'. */ @@ -601,7 +586,38 @@ static void *plugin_read_thread(void __attribute__((unused)) * args) { return ((void *)0); } /* void *plugin_read_thread */ -static void start_read_threads(int num) { +#ifdef PTHREAD_MAX_NAMELEN_NP +#define THREAD_NAME_MAX PTHREAD_MAX_NAMELEN_NP +#else +#define THREAD_NAME_MAX 16 +#endif + +static void set_thread_name(pthread_t tid, char const *name) { +#if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP) + + /* glibc limits the length of the name and fails if the passed string + * is too long, so we truncate it here. */ + char n[THREAD_NAME_MAX]; + if (strlen(name) >= THREAD_NAME_MAX) + WARNING("set_thread_name(\"%s\"): name too long", name); + sstrncpy(n, name, sizeof(n)); + +#if defined(HAVE_PTHREAD_SETNAME_NP) + int status = pthread_setname_np(tid, n); + if (status != 0) { + char errbuf[1024]; + ERROR("set_thread_name(\"%s\"): %s", n, + sstrerror(status, errbuf, sizeof(errbuf))); + } +#else /* if defined(HAVE_PTHREAD_SET_NAME_NP) */ + pthread_set_name_np(tid, n); +#endif + +#endif +} + +static void start_read_threads(size_t num) /* {{{ */ +{ if (read_threads != NULL) return; @@ -612,22 +628,31 @@ static void start_read_threads(int num) { } read_threads_num = 0; - for (int i = 0; i < num; i++) { - if (pthread_create(read_threads + read_threads_num, NULL, - plugin_read_thread, NULL) == 0) { - read_threads_num++; - } else { - ERROR("plugin: start_read_threads: pthread_create failed."); + for (size_t i = 0; i < num; i++) { + int status = pthread_create(read_threads + read_threads_num, + /* attr = */ NULL, plugin_read_thread, + /* arg = */ NULL); + if (status != 0) { + char errbuf[1024]; + ERROR("plugin: start_read_threads: pthread_create failed " + "with status %i (%s).", + status, sstrerror(status, errbuf, sizeof(errbuf))); return; } + + char name[THREAD_NAME_MAX]; + ssnprintf(name, sizeof(name), "reader#%zu", read_threads_num); + set_thread_name(read_threads[read_threads_num], name); + + read_threads_num++; } /* for (i) */ -} /* void start_read_threads */ +} /* }}} void start_read_threads */ static void stop_read_threads(void) { if (read_threads == NULL) return; - INFO("collectd: Stopping %i read threads.", read_threads_num); + INFO("collectd: Stopping %zu read threads.", read_threads_num); pthread_mutex_lock(&read_lock); read_loop = 0; @@ -635,7 +660,7 @@ static void stop_read_threads(void) { pthread_cond_broadcast(&read_cond); pthread_mutex_unlock(&read_lock); - for (int i = 0; i < read_threads_num; i++) { + for (size_t i = 0; i < read_threads_num; i++) { if (pthread_join(read_threads[i], NULL) != 0) { ERROR("plugin: stop_read_threads: pthread_join failed."); } @@ -668,6 +693,9 @@ plugin_value_list_clone(value_list_t const *vl_orig) /* {{{ */ return (NULL); memcpy(vl, vl_orig, sizeof(*vl)); + if (vl->host[0] == 0) + sstrncpy(vl->host, hostname_g, sizeof(vl->host)); + vl->values = calloc(vl_orig->values_len, sizeof(*vl->values)); if (vl->values == NULL) { plugin_value_list_free(vl); @@ -808,11 +836,9 @@ static void start_write_threads(size_t num) /* {{{ */ write_threads_num = 0; for (size_t i = 0; i < num; i++) { - int status; - - status = pthread_create(write_threads + write_threads_num, - /* attr = */ NULL, plugin_write_thread, - /* arg = */ NULL); + int status = pthread_create(write_threads + write_threads_num, + /* attr = */ NULL, plugin_write_thread, + /* arg = */ NULL); if (status != 0) { char errbuf[1024]; ERROR("plugin: start_write_threads: pthread_create failed " @@ -821,6 +847,10 @@ static void start_write_threads(size_t num) /* {{{ */ return; } + char name[THREAD_NAME_MAX]; + ssnprintf(name, sizeof(name), "writer#%zu", write_threads_num); + set_thread_name(write_threads[write_threads_num], name); + write_threads_num++; } /* for (i) */ } /* }}} void start_write_threads */ @@ -929,7 +959,7 @@ static void plugin_free_loaded(void) { } #define BUFSIZE 512 -int plugin_load(char const *plugin_name, uint32_t flags) { +int plugin_load(char const *plugin_name, _Bool global) { DIR *dh; const char *dir; char filename[BUFSIZE] = ""; @@ -963,7 +993,7 @@ int plugin_load(char const *plugin_name, uint32_t flags) { */ if ((strcasecmp("perl", plugin_name) == 0) || (strcasecmp("python", plugin_name) == 0)) - flags |= PLUGIN_FLAGS_GLOBAL; + global = 1; /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the * type when matching the filename */ @@ -1001,7 +1031,7 @@ int plugin_load(char const *plugin_name, uint32_t flags) { continue; } - status = plugin_load_file(filename, flags); + status = plugin_load_file(filename, global); if (status == 0) { /* success */ plugin_mark_loaded(plugin_name); @@ -1610,7 +1640,7 @@ int plugin_init_all(void) { rt = global_option_get("ReadThreads"); num = atoi(rt); if (num != -1) - start_read_threads((num > 0) ? num : 5); + start_read_threads((num > 0) ? ((size_t)num) : 5); } return ret; } /* void plugin_init_all */ @@ -1882,15 +1912,16 @@ static int plugin_dispatch_values_internal(value_list_t *vl) { int status; static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC; - value_t *saved_values; - int saved_values_len; - data_set_t *ds; - int free_meta_data = 0; + _Bool free_meta_data = 0; - assert(vl); - assert(vl->plugin); + assert(vl != NULL); + + /* These fields are initialized by plugin_value_list_clone() if needed: */ + assert(vl->host[0] != 0); + assert(vl->time != 0); /* The time is determined at _enqueue_ time. */ + assert(vl->interval != 0); if (vl->type[0] == 0 || vl->values == NULL || vl->values_len < 1) { ERROR("plugin_dispatch_values: Invalid value list " @@ -1928,11 +1959,6 @@ static int plugin_dispatch_values_internal(value_list_t *vl) { return (-1); } - /* Assured by plugin_value_list_clone(). The time is determined at - * _enqueue_ time. */ - assert(vl->time != 0); - assert(vl->interval != 0); - DEBUG("plugin_dispatch_values: time = %.3f; interval = %.3f; " "host = %s; " "plugin = %s; plugin_instance = %s; " @@ -1966,26 +1992,6 @@ static int plugin_dispatch_values_internal(value_list_t *vl) { escape_slashes(vl->type, sizeof(vl->type)); escape_slashes(vl->type_instance, sizeof(vl->type_instance)); - /* Copy the values. This way, we can assure `targets' that they get - * dynamically allocated values, which they can free and replace if - * they like. */ - if ((pre_cache_chain != NULL) || (post_cache_chain != NULL)) { - saved_values = vl->values; - saved_values_len = vl->values_len; - - vl->values = (value_t *)calloc(vl->values_len, sizeof(*vl->values)); - if (vl->values == NULL) { - ERROR("plugin_dispatch_values: calloc failed."); - vl->values = saved_values; - return (-1); - } - memcpy(vl->values, saved_values, vl->values_len * sizeof(*vl->values)); - } else /* if ((pre == NULL) && (post == NULL)) */ - { - saved_values = NULL; - saved_values_len = 0; - } - if (pre_cache_chain != NULL) { status = fc_process_chain(ds, vl, pre_cache_chain); if (status < 0) { @@ -1993,16 +1999,8 @@ static int plugin_dispatch_values_internal(value_list_t *vl) { "pre-cache chain failed with " "status %i (%#x).", status, status); - } else if (status == FC_TARGET_STOP) { - /* Restore the state of the value_list so that plugins - * don't get confused.. */ - if (saved_values != NULL) { - sfree(vl->values); - vl->values = saved_values; - vl->values_len = saved_values_len; - } + } else if (status == FC_TARGET_STOP) return (0); - } } /* Update the value cache */ @@ -2019,14 +2017,6 @@ static int plugin_dispatch_values_internal(value_list_t *vl) { } else fc_default_action(ds, vl); - /* Restore the state of the value_list so that plugins don't get - * confused.. */ - if (saved_values != NULL) { - sfree(vl->values); - vl->values = saved_values; - vl->values_len = saved_values_len; - } - if ((free_meta_data != 0) && (vl->meta != NULL)) { meta_data_destroy(vl->meta); vl->meta = NULL; @@ -2565,18 +2555,26 @@ static void *plugin_thread_start(void *arg) { } /* void *plugin_thread_start */ int plugin_thread_create(pthread_t *thread, const pthread_attr_t *attr, - void *(*start_routine)(void *), void *arg) { + void *(*start_routine)(void *), void *arg, + char const *name) { plugin_thread_t *plugin_thread; plugin_thread = malloc(sizeof(*plugin_thread)); if (plugin_thread == NULL) - return -1; + return ENOMEM; plugin_thread->ctx = plugin_get_ctx(); plugin_thread->start_routine = start_routine; plugin_thread->arg = arg; - return pthread_create(thread, attr, plugin_thread_start, plugin_thread); -} /* int plugin_thread_create */ + int ret = pthread_create(thread, attr, plugin_thread_start, plugin_thread); + if (ret != 0) { + sfree(plugin_thread); + return ret; + } -/* vim: set sw=8 ts=8 noet fdm=marker : */ + if (name != NULL) + set_thread_name(*thread, name); + + return 0; +} /* int plugin_thread_create */