* Sebastian Harl <sh at tokkee.org>
**/
+/* _GNU_SOURCE is needed in Linux to use pthread_setname_np */
+#define _GNU_SOURCE
+
#include "collectd.h"
+
#include "common.h"
#include "plugin.h"
#include "configfile.h"
#include "utils_time.h"
#include "utils_random.h"
+#if HAVE_PTHREAD_NP_H
+# include <pthread_np.h> /* for pthread_set_name_np(3) */
+#endif
+
#include <ltdl.h>
/*
static char *plugindir = NULL;
#ifndef DEFAULT_MAX_READ_INTERVAL
-# define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T (86400)
+# define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC (86400)
#endif
static c_heap_t *read_heap = NULL;
static llist_t *read_list;
static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
static pthread_t *read_threads = NULL;
-static int read_threads_num = 0;
+static size_t read_threads_num = 0;
static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
static write_queue_t *write_queue_head;
}
static void plugin_update_internal_statistics (void) { /* {{{ */
- derive_t copy_write_queue_length;
- value_list_t vl = VALUE_LIST_INIT;
- value_t values[2];
- copy_write_queue_length = write_queue_length;
+ gauge_t copy_write_queue_length = (gauge_t) write_queue_length;
/* Initialize `vl' */
- vl.values = values;
- vl.values_len = 2;
- vl.time = 0;
+ value_list_t vl = VALUE_LIST_INIT;
sstrncpy (vl.host, hostname_g, sizeof (vl.host));
sstrncpy (vl.plugin, "collectd", sizeof (vl.plugin));
- vl.type_instance[0] = 0;
- vl.values_len = 1;
-
/* Write queue */
sstrncpy (vl.plugin_instance, "write_queue",
sizeof (vl.plugin_instance));
/* Write queue : queue length */
- vl.values[0].gauge = (gauge_t) copy_write_queue_length;
+ vl.values = &(value_t) { .gauge = copy_write_queue_length };
+ vl.values_len = 1;
sstrncpy (vl.type, "queue_length", sizeof (vl.type));
vl.type_instance[0] = 0;
plugin_dispatch_values (&vl);
/* Write queue : Values dropped (queue length > low limit) */
- vl.values[0].derive = (derive_t) stats_values_dropped;
+ vl.values = &(value_t) { .gauge = (gauge_t) stats_values_dropped };
+ vl.values_len = 1;
sstrncpy (vl.type, "derive", sizeof (vl.type));
sstrncpy (vl.type_instance, "dropped", sizeof (vl.type_instance));
plugin_dispatch_values (&vl);
sizeof (vl.plugin_instance));
/* Cache : Nb entry in cache tree */
- vl.values[0].gauge = (gauge_t) uc_get_size();
+ vl.values = &(value_t) { .gauge = (gauge_t) uc_get_size() };
+ vl.values_len = 1;
sstrncpy (vl.type, "cache_size", sizeof (vl.type));
vl.type_instance[0] = 0;
plugin_dispatch_values (&vl);
{
char *str;
int len;
- llentry_t *le;
int i;
+ llentry_t *le;
int n;
char **keys;
} /* }}} void log_list_callbacks */
static int create_register_callback (llist_t **list, /* {{{ */
- const char *name, void *callback, user_data_t *ud)
+ const char *name, void *callback, user_data_t const *ud)
{
callback_func_t *cf;
&& (cdtime () < rf->rf_next_read)
&& rc == 0)
{
- struct timespec ts = { 0 };
-
- CDTIME_T_TO_TIMESPEC (rf->rf_next_read, &ts);
-
rc = pthread_cond_timedwait (&read_cond, &read_lock,
- &ts);
+ &CDTIME_T_TO_TIMESPEC (rf->rf_next_read));
}
/* Must hold `read_lock' when accessing `rf->rf_type'. */
return ((void *) 0);
} /* void *plugin_read_thread */
-static void start_read_threads (int num)
-{
- int i;
+#ifdef PTHREAD_MAX_NAMELEN_NP
+# define THREAD_NAME_MAX PTHREAD_MAX_NAMELEN_NP
+#else
+# define THREAD_NAME_MAX 16
+#endif
+
+static void set_thread_name(pthread_t tid, char const *name) {
+#if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP)
+ /* glibc limits the length of the name and fails if the passed string
+ * is too long, so we truncate it here. */
+ char n[THREAD_NAME_MAX];
+ if (strlen (name) >= THREAD_NAME_MAX)
+ WARNING("set_thread_name(\"%s\"): name too long", name);
+ sstrncpy (n, name, sizeof(n));
+
+#if defined(HAVE_PTHREAD_SETNAME_NP)
+ int status = pthread_setname_np (tid, n);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("set_thread_name(\"%s\"): %s", n,
+ sstrerror (status, errbuf, sizeof(errbuf)));
+ }
+#else /* if defined(HAVE_PTHREAD_SET_NAME_NP) */
+ pthread_set_name_np (tid, n);
+#endif
+
+#endif
+}
+
+static void start_read_threads (size_t num) /* {{{ */
+{
if (read_threads != NULL)
return;
}
read_threads_num = 0;
- for (i = 0; i < num; i++)
+ for (size_t i = 0; i < num; i++)
{
- if (pthread_create (read_threads + read_threads_num, NULL,
- plugin_read_thread, NULL) == 0)
- {
- read_threads_num++;
- }
- else
+ int status = pthread_create (read_threads + read_threads_num,
+ /* attr = */ NULL,
+ plugin_read_thread,
+ /* arg = */ NULL);
+ if (status != 0)
{
- ERROR ("plugin: start_read_threads: pthread_create failed.");
+ char errbuf[1024];
+ ERROR ("plugin: start_read_threads: pthread_create failed "
+ "with status %i (%s).", status,
+ sstrerror (status, errbuf, sizeof (errbuf)));
return;
}
+
+ char name[THREAD_NAME_MAX];
+ ssnprintf (name, sizeof (name), "reader#%zu", read_threads_num);
+ set_thread_name (read_threads[read_threads_num], name);
+
+ read_threads_num++;
} /* for (i) */
-} /* void start_read_threads */
+} /* }}} void start_read_threads */
static void stop_read_threads (void)
{
- int i;
-
if (read_threads == NULL)
return;
- INFO ("collectd: Stopping %i read threads.", read_threads_num);
+ INFO ("collectd: Stopping %zu read threads.", read_threads_num);
pthread_mutex_lock (&read_lock);
read_loop = 0;
pthread_cond_broadcast (&read_cond);
pthread_mutex_unlock (&read_lock);
- for (i = 0; i < read_threads_num; i++)
+ for (size_t i = 0; i < read_threads_num; i++)
{
if (pthread_join (read_threads[i], NULL) != 0)
{
return (NULL);
memcpy (vl, vl_orig, sizeof (*vl));
+ if (vl->host[0] == 0)
+ sstrncpy (vl->host, hostname_g, sizeof (vl->host));
+
vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
if (vl->values == NULL)
{
static void start_write_threads (size_t num) /* {{{ */
{
- size_t i;
-
if (write_threads != NULL)
return;
}
write_threads_num = 0;
- for (i = 0; i < num; i++)
+ for (size_t i = 0; i < num; i++)
{
- int status;
-
- status = pthread_create (write_threads + write_threads_num,
+ int status = pthread_create (write_threads + write_threads_num,
/* attr = */ NULL,
plugin_write_thread,
/* arg = */ NULL);
return;
}
+ char name[THREAD_NAME_MAX];
+ ssnprintf (name, sizeof (name), "writer#%zu", write_threads_num);
+ set_thread_name (write_threads[write_threads_num], name);
+
write_threads_num++;
} /* for (i) */
} /* }}} void start_write_threads */
int status;
if (plugins_loaded == NULL)
- plugins_loaded = c_avl_create ((void *) strcasecmp);
+ plugins_loaded = c_avl_create ((int (*) (const void *, const void *)) strcasecmp);
assert (plugins_loaded != NULL);
status = c_avl_get (plugins_loaded, name, /* ret_value = */ NULL);
int plugin_register_complex_read (const char *group, const char *name,
plugin_read_cb callback,
cdtime_t interval,
- user_data_t *user_data)
+ user_data_t const *user_data)
{
read_func_t *rf;
int status;
} /* int plugin_register_complex_read */
int plugin_register_write (const char *name,
- plugin_write_cb callback, user_data_t *ud)
+ plugin_write_cb callback, user_data_t const *ud)
{
return (create_register_callback (&list_write, name,
(void *) callback, ud));
} /* static char *plugin_flush_callback_name */
int plugin_register_flush (const char *name,
- plugin_flush_cb callback, user_data_t *ud)
+ plugin_flush_cb callback, user_data_t const *ud)
{
int status;
plugin_ctx_t ctx = plugin_get_ctx ();
}
cb->timeout = ctx.flush_timeout;
- ud->data = cb;
- ud->free_func = plugin_flush_timeout_callback_free;
-
status = plugin_register_complex_read (
/* group = */ "flush",
/* name = */ flush_name,
/* callback = */ plugin_flush_timeout_callback,
/* interval = */ ctx.flush_interval,
- /* user data = */ ud);
+ /* user data = */ &(user_data_t) {
+ .data = cb,
+ .free_func = plugin_flush_timeout_callback_free,
+ });
sfree (flush_name);
if (status != 0)
} /* int plugin_register_flush */
int plugin_register_missing (const char *name,
- plugin_missing_cb callback, user_data_t *ud)
+ plugin_missing_cb callback, user_data_t const *ud)
{
return (create_register_callback (&list_missing, name,
(void *) callback, ud));
int plugin_register_data_set (const data_set_t *ds)
{
data_set_t *ds_copy;
- size_t i;
if ((data_sets != NULL)
&& (c_avl_get (data_sets, ds->type, NULL) == 0))
return (-1);
}
- for (i = 0; i < ds->ds_num; i++)
+ for (size_t i = 0; i < ds->ds_num; i++)
memcpy (ds_copy->ds + i, ds->ds + i, sizeof (data_source_t));
return (c_avl_insert (data_sets, (void *) ds_copy->type, (void *) ds_copy));
} /* int plugin_register_data_set */
int plugin_register_log (const char *name,
- plugin_log_cb callback, user_data_t *ud)
+ plugin_log_cb callback, user_data_t const *ud)
{
return (create_register_callback (&list_log, name,
(void *) callback, ud));
} /* int plugin_register_log */
int plugin_register_notification (const char *name,
- plugin_notification_cb callback, user_data_t *ud)
+ plugin_notification_cb callback, user_data_t const *ud)
{
return (create_register_callback (&list_notification, name,
(void *) callback, ud));
return (plugin_unregister (list_notification, name));
}
-void plugin_init_all (void)
+int plugin_init_all (void)
{
char const *chain_name;
llentry_t *le;
int status;
+ int ret = 0;
/* Init the value cache */
uc_init ();
}
if ((list_init == NULL) && (read_heap == NULL))
- return;
+ return ret;
/* Calling all init callbacks before checking if read callbacks
* are available allows the init callbacks to register the read
* handling themselves. */
/* FIXME: Unload _all_ functions */
plugin_unregister_read (le->key);
+ ret = -1;
}
le = le->next;
rt = global_option_get ("ReadThreads");
num = atoi (rt);
if (num != -1)
- start_read_threads ((num > 0) ? num : 5);
+ start_read_threads ((num > 0) ? ((size_t) num) : 5);
}
+ return ret;
} /* void plugin_init_all */
/* TODO: Rename this function. */
return (0);
} /* int plugin_flush */
-void plugin_shutdown_all (void)
+int plugin_shutdown_all (void)
{
llentry_t *le;
-
- stop_read_threads ();
+ int ret = 0; // Assume success.
destroy_all_callbacks (&list_init);
+ stop_read_threads ();
+
pthread_mutex_lock (&read_lock);
llist_destroy (read_list);
read_list = NULL;
destroy_read_heap ();
+ /* blocks until all write threads have shut down. */
+ stop_write_threads ();
+
+ /* ask all plugins to write out the state they kept. */
plugin_flush (/* plugin = */ NULL,
/* timeout = */ 0,
/* identifier = */ NULL);
* after callback returns. */
le = le->next;
- (*callback) ();
+ if ((*callback) () != 0)
+ ret = -1;
plugin_set_ctx (old_ctx);
}
- stop_write_threads ();
-
/* Write plugins which use the `user_data' pointer usually need the
* same data available to the flush callback. If this is the case, set
* the free_function to NULL when registering the flush callback and to
plugin_free_loaded ();
plugin_free_data_sets ();
+ return (ret);
} /* void plugin_shutdown_all */
int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
int status;
static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
- value_t *saved_values;
- int saved_values_len;
-
data_set_t *ds;
- int free_meta_data = 0;
+ _Bool free_meta_data = 0;
- assert(vl);
- assert(vl->plugin);
+ assert (vl != NULL);
+
+ /* These fields are initialized by plugin_value_list_clone() if needed: */
+ assert (vl->host[0] != 0);
+ assert (vl->time != 0); /* The time is determined at _enqueue_ time. */
+ assert (vl->interval != 0);
if (vl->type[0] == 0 || vl->values == NULL || vl->values_len < 1)
{
return (-1);
}
- /* Assured by plugin_value_list_clone(). The time is determined at
- * _enqueue_ time. */
- assert (vl->time != 0);
- assert (vl->interval != 0);
-
DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; "
"host = %s; "
"plugin = %s; plugin_instance = %s; "
escape_slashes (vl->type, sizeof (vl->type));
escape_slashes (vl->type_instance, sizeof (vl->type_instance));
- /* Copy the values. This way, we can assure `targets' that they get
- * dynamically allocated values, which they can free and replace if
- * they like. */
- if ((pre_cache_chain != NULL) || (post_cache_chain != NULL))
- {
- saved_values = vl->values;
- saved_values_len = vl->values_len;
-
- vl->values = (value_t *) calloc (vl->values_len,
- sizeof (*vl->values));
- if (vl->values == NULL)
- {
- ERROR ("plugin_dispatch_values: calloc failed.");
- vl->values = saved_values;
- return (-1);
- }
- memcpy (vl->values, saved_values,
- vl->values_len * sizeof (*vl->values));
- }
- else /* if ((pre == NULL) && (post == NULL)) */
- {
- saved_values = NULL;
- saved_values_len = 0;
- }
-
if (pre_cache_chain != NULL)
{
status = fc_process_chain (ds, vl, pre_cache_chain);
status, status);
}
else if (status == FC_TARGET_STOP)
- {
- /* Restore the state of the value_list so that plugins
- * don't get confused.. */
- if (saved_values != NULL)
- {
- sfree (vl->values);
- vl->values = saved_values;
- vl->values_len = saved_values_len;
- }
return (0);
- }
}
/* Update the value cache */
else
fc_default_action (ds, vl);
- /* Restore the state of the value_list so that plugins don't get
- * confused.. */
- if (saved_values != NULL)
- {
- sfree (vl->values);
- vl->values = saved_values;
- vl->values_len = saved_values_len;
- }
-
if ((free_meta_data != 0) && (vl->meta != NULL))
{
meta_data_destroy (vl->meta);
case DS_TYPE_GAUGE:
vl->values[0].gauge = va_arg (ap, gauge_t);
if (store_percentage)
- vl->values[0].gauge *= sum ? (100.0 / sum) : 0;
+ vl->values[0].gauge *= sum ? (100.0 / sum) : NAN;
break;
case DS_TYPE_ABSOLUTE:
vl->values[0].absolute = va_arg (ap, absolute_t);
int plugin_notification_meta_copy (notification_t *dst,
const notification_t *src)
{
- notification_meta_t *meta;
-
assert (dst != NULL);
assert (src != NULL);
assert (dst != src);
assert ((src->meta == NULL) || (src->meta != dst->meta));
- for (meta = src->meta; meta != NULL; meta = meta->next)
+ for (notification_meta_t *meta = src->meta; meta != NULL; meta = meta->next)
{
if (meta->type == NM_TYPE_STRING)
plugin_notification_meta_add_string (dst, meta->name,
} /* void *plugin_thread_start */
int plugin_thread_create (pthread_t *thread, const pthread_attr_t *attr,
- void *(*start_routine) (void *), void *arg)
+ void *(*start_routine) (void *), void *arg, char const *name)
{
plugin_thread_t *plugin_thread;
plugin_thread = malloc (sizeof (*plugin_thread));
if (plugin_thread == NULL)
- return -1;
+ return ENOMEM;
plugin_thread->ctx = plugin_get_ctx ();
plugin_thread->start_routine = start_routine;
plugin_thread->arg = arg;
- return pthread_create (thread, attr,
+ int ret = pthread_create (thread, attr,
plugin_thread_start, plugin_thread);
+ if (ret != 0)
+ {
+ sfree (plugin_thread);
+ return ret;
+ }
+
+ if (name != NULL)
+ set_thread_name (*thread, name);
+
+ return 0;
} /* int plugin_thread_create */
/* vim: set sw=8 ts=8 noet fdm=marker : */