X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fplugin.c;h=cdd56bd79dd8f2f4d0c6374c7d33fe5ebbe224d7;hb=633c3966f770e4d46651a2fe219a18d8a9907a9f;hp=647c1fb691d4b09bb2b9f74ff61b5958db6b032f;hpb=a841b5f2882acf431f4fd0783f602f3ae80716c5;p=collectd.git diff --git a/src/plugin.c b/src/plugin.c index 647c1fb6..30a1ff1a 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -1,43 +1,48 @@ /** * collectd - src/plugin.c - * Copyright (C) 2005-2009 Florian octo Forster + * Copyright (C) 2005-2014 Florian octo Forster * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; only version 2 of the License is applicable. + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. * * Authors: - * Florian octo Forster + * Florian octo Forster * Sebastian Harl **/ #include "collectd.h" -#include "utils_complain.h" - -#include - -#if HAVE_PTHREAD_H -# include -#endif - #include "common.h" #include "plugin.h" #include "configfile.h" +#include "filter_chain.h" #include "utils_avltree.h" +#include "utils_cache.h" +#include "utils_complain.h" #include "utils_llist.h" #include "utils_heap.h" -#include "utils_cache.h" -#include "utils_threshold.h" -#include "filter_chain.h" +#include "utils_time.h" +#include "utils_random.h" + +#if HAVE_PTHREAD_H +# include +#endif + +#include /* * Private structures @@ -46,6 +51,7 @@ struct callback_func_s { void *cf_callback; user_data_t cf_udata; + plugin_ctx_t cf_ctx; }; typedef struct callback_func_s callback_func_t; @@ -58,21 +64,35 @@ struct read_func_s * The `rf_super' member MUST be the first one in this structure! */ #define rf_callback rf_super.cf_callback #define rf_udata rf_super.cf_udata +#define rf_ctx rf_super.cf_ctx callback_func_t rf_super; - char rf_name[DATA_MAX_NAME_LEN]; + char rf_group[DATA_MAX_NAME_LEN]; + char *rf_name; int rf_type; - struct timespec rf_interval; - struct timespec rf_effective_interval; - struct timespec rf_next_read; + cdtime_t rf_interval; + cdtime_t rf_effective_interval; + cdtime_t rf_next_read; }; typedef struct read_func_s read_func_t; +struct write_queue_s; +typedef struct write_queue_s write_queue_t; +struct write_queue_s +{ + value_list_t *vl; + plugin_ctx_t ctx; + write_queue_t *next; +}; + /* * Private variables */ +static c_avl_tree_t *plugins_loaded = NULL; + static llist_t *list_init; static llist_t *list_write; static llist_t *list_flush; +static llist_t *list_missing; static llist_t *list_shutdown; static llist_t *list_log; static llist_t *list_notification; @@ -92,9 +112,26 @@ static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; static pthread_t *read_threads = NULL; static int read_threads_num = 0; +static write_queue_t *write_queue_head; +static write_queue_t *write_queue_tail; +static long write_queue_length = 0; +static _Bool write_loop = 1; +static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; +static pthread_t *write_threads = NULL; +static size_t write_threads_num = 0; + +static pthread_key_t plugin_ctx_key; +static _Bool plugin_ctx_key_initialized = 0; + +static long write_limit_high = 0; +static long write_limit_low = 0; + /* * Static functions */ +static int plugin_dispatch_values_internal (value_list_t *vl); + static const char *plugin_get_dir (void) { if (plugindir == NULL) @@ -151,7 +188,7 @@ static void destroy_read_heap (void) /* {{{ */ { callback_func_t *cf; - cf = c_head_get_root (read_heap); + cf = c_heap_get_root (read_heap); if (cf == NULL) break; @@ -173,7 +210,7 @@ static int register_callback (llist_t **list, /* {{{ */ *list = llist_create (); if (*list == NULL) { - ERROR ("plugin: create_register_callback: " + ERROR ("plugin: register_callback: " "llist_create failed."); destroy_callback (cf); return (-1); @@ -183,7 +220,7 @@ static int register_callback (llist_t **list, /* {{{ */ key = strdup (name); if (key == NULL) { - ERROR ("plugin: create_register_callback: strdup failed."); + ERROR ("plugin: register_callback: strdup failed."); destroy_callback (cf); return (-1); } @@ -194,7 +231,7 @@ static int register_callback (llist_t **list, /* {{{ */ le = llentry_create (key, cf); if (le == NULL) { - ERROR ("plugin: create_register_callback: " + ERROR ("plugin: register_callback: " "llentry_create failed."); free (key); destroy_callback (cf); @@ -210,6 +247,10 @@ static int register_callback (llist_t **list, /* {{{ */ old_cf = le->value; le->value = cf; + WARNING ("plugin: register_callback: " + "a callback named `%s' already exists - " + "overwriting the old entry!", name); + destroy_callback (old_cf); sfree (key); } @@ -241,6 +282,8 @@ static int create_register_callback (llist_t **list, /* {{{ */ cf->cf_udata = *ud; } + cf->cf_ctx = plugin_get_ctx (); + return (register_callback (list, name, cf)); } /* }}} int create_register_callback */ @@ -270,28 +313,55 @@ static int plugin_unregister (llist_t *list, const char *name) /* {{{ */ * object, but it will bitch about a shared object not having a * ``module_register'' symbol.. */ -static int plugin_load_file (char *file) +static int plugin_load_file (char *file, uint32_t flags) { lt_dlhandle dlh; void (*reg_handle) (void); - DEBUG ("file = %s", file); - lt_dlinit (); lt_dlerror (); /* clear errors */ - if ((dlh = lt_dlopen (file)) == NULL) +#if LIBTOOL_VERSION == 2 + if (flags & PLUGIN_FLAGS_GLOBAL) { + lt_dladvise advise; + lt_dladvise_init(&advise); + lt_dladvise_global(&advise); + dlh = lt_dlopenadvise(file, advise); + lt_dladvise_destroy(&advise); + } else { + dlh = lt_dlopen (file); + } +#else /* if LIBTOOL_VERSION == 1 */ + if (flags & PLUGIN_FLAGS_GLOBAL) + WARNING ("plugin_load_file: The global flag is not supported, " + "libtool 2 is required for this."); + dlh = lt_dlopen (file); +#endif + + if (dlh == NULL) { - const char *error = lt_dlerror (); + char errbuf[1024] = ""; + + ssnprintf (errbuf, sizeof (errbuf), + "lt_dlopen (\"%s\") failed: %s. " + "The most common cause for this problem are " + "missing dependencies. Use ldd(1) to check " + "the dependencies of the plugin " + "/ shared object.", + file, lt_dlerror ()); + + ERROR ("%s", errbuf); + /* Make sure this is printed to STDERR in any case, but also + * make sure it's printed only once. */ + if (list_log != NULL) + fprintf (stderr, "ERROR: %s\n", errbuf); - ERROR ("lt_dlopen (%s) failed: %s", file, error); - fprintf (stderr, "lt_dlopen (%s) failed: %s\n", file, error); return (1); } if ((reg_handle = (void (*) (void)) lt_dlsym (dlh, "module_register")) == NULL) { - WARNING ("Couldn't find symbol `module_register' in `%s': %s\n", + WARNING ("Couldn't find symbol \"module_register\" in \"%s\": %s\n", file, lt_dlerror ()); lt_dlclose (dlh); return (-1); @@ -302,52 +372,40 @@ static int plugin_load_file (char *file) return (0); } -static _Bool timeout_reached(struct timespec timeout) -{ - struct timeval now; - gettimeofday(&now, NULL); - return (now.tv_sec >= timeout.tv_sec && now.tv_usec >= (timeout.tv_nsec / 1000)); -} - static void *plugin_read_thread (void __attribute__((unused)) *args) { while (read_loop != 0) { read_func_t *rf; - struct timeval now; + plugin_ctx_t old_ctx; + cdtime_t now; int status; int rf_type; int rc; - /* Get the read function that needs to be read next. */ - rf = c_head_get_root (read_heap); + /* Get the read function that needs to be read next. + * We don't need to hold "read_lock" for the heap, but we need + * to call c_heap_get_root() and pthread_cond_wait() in the + * same protected block. */ + pthread_mutex_lock (&read_lock); + rf = c_heap_get_root (read_heap); if (rf == NULL) { - struct timespec abstime; - - gettimeofday (&now, /* timezone = */ NULL); - - abstime.tv_sec = now.tv_sec + interval_g; - abstime.tv_nsec = 1000 * now.tv_usec; - - pthread_mutex_lock (&read_lock); - pthread_cond_timedwait (&read_cond, &read_lock, - &abstime); - pthread_mutex_unlock (&read_lock); + pthread_cond_wait (&read_cond, &read_lock); + pthread_mutex_unlock (&read_lock); continue; } + pthread_mutex_unlock (&read_lock); - if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0)) + if (rf->rf_interval == 0) { - gettimeofday (&now, /* timezone = */ NULL); - - rf->rf_interval.tv_sec = interval_g; - rf->rf_interval.tv_nsec = 0; - + /* this should not happen, because the interval is set + * for each plugin when loading it + * XXX: issue a warning? */ + rf->rf_interval = plugin_get_interval (); rf->rf_effective_interval = rf->rf_interval; - rf->rf_next_read.tv_sec = now.tv_sec; - rf->rf_next_read.tv_nsec = 1000 * now.tv_usec; + rf->rf_next_read = cdtime (); } /* sleep until this entry is due, @@ -358,12 +416,19 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) * we need to re-evaluate the condition every time * pthread_cond_timedwait returns. */ rc = 0; - while (!timeout_reached(rf->rf_next_read) && rc == 0) { + while ((read_loop != 0) + && (cdtime () < rf->rf_next_read) + && rc == 0) + { + struct timespec ts = { 0 }; + + CDTIME_T_TO_TIMESPEC (rf->rf_next_read, &ts); + rc = pthread_cond_timedwait (&read_cond, &read_lock, - &rf->rf_next_read); + &ts); } - /* Must hold `real_lock' when accessing `rf->rf_type'. */ + /* Must hold `read_lock' when accessing `rf->rf_type'. */ rf_type = rf->rf_type; pthread_mutex_unlock (&read_lock); @@ -384,6 +449,7 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) { DEBUG ("plugin_read_thread: Destroying the `%s' " "callback.", rf->rf_name); + sfree (rf->rf_name); destroy_callback ((callback_func_t *) rf); rf = NULL; continue; @@ -391,6 +457,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name); + old_ctx = plugin_set_ctx (rf->rf_ctx); + if (rf_type == RF_SIMPLE) { int (*callback) (void); @@ -408,24 +476,20 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) status = (*callback) (&rf->rf_udata); } + plugin_set_ctx (old_ctx); + /* If the function signals failure, we will increase the * intervals in which it will be called. */ if (status != 0) { - rf->rf_effective_interval.tv_sec *= 2; - rf->rf_effective_interval.tv_nsec *= 2; - NORMALIZE_TIMESPEC (rf->rf_effective_interval); - - if (rf->rf_effective_interval.tv_sec >= 86400) - { - rf->rf_effective_interval.tv_sec = 86400; - rf->rf_effective_interval.tv_nsec = 0; - } + rf->rf_effective_interval *= 2; + if (rf->rf_effective_interval > TIME_T_TO_CDTIME_T (86400)) + rf->rf_effective_interval = TIME_T_TO_CDTIME_T (86400); NOTICE ("read-function of plugin `%s' failed. " - "Will suspend it for %i seconds.", + "Will suspend it for %.3f seconds.", rf->rf_name, - (int) rf->rf_effective_interval.tv_sec); + CDTIME_T_TO_DOUBLE (rf->rf_effective_interval)); } else { @@ -434,38 +498,29 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) } /* update the ``next read due'' field */ - gettimeofday (&now, /* timezone = */ NULL); + now = cdtime (); DEBUG ("plugin_read_thread: Effective interval of the " - "%s plugin is %i.%09i.", + "%s plugin is %.3f seconds.", rf->rf_name, - (int) rf->rf_effective_interval.tv_sec, - (int) rf->rf_effective_interval.tv_nsec); + CDTIME_T_TO_DOUBLE (rf->rf_effective_interval)); /* Calculate the next (absolute) time at which this function * should be called. */ - rf->rf_next_read.tv_sec = rf->rf_next_read.tv_sec - + rf->rf_effective_interval.tv_sec; - rf->rf_next_read.tv_nsec = rf->rf_next_read.tv_nsec - + rf->rf_effective_interval.tv_nsec; - NORMALIZE_TIMESPEC (rf->rf_next_read); + rf->rf_next_read += rf->rf_effective_interval; /* Check, if `rf_next_read' is in the past. */ - if ((rf->rf_next_read.tv_sec < now.tv_sec) - || ((rf->rf_next_read.tv_sec == now.tv_sec) - && (rf->rf_next_read.tv_nsec < (1000 * now.tv_usec)))) + if (rf->rf_next_read < now) { /* `rf_next_read' is in the past. Insert `now' * so this value doesn't trail off into the * past too much. */ - rf->rf_next_read.tv_sec = now.tv_sec; - rf->rf_next_read.tv_nsec = 1000 * now.tv_usec; + rf->rf_next_read = now; } - DEBUG ("plugin_read_thread: Next read of the %s plugin at %i.%09i.", + DEBUG ("plugin_read_thread: Next read of the %s plugin at %.3f.", rf->rf_name, - (int) rf->rf_next_read.tv_sec, - (int) rf->rf_next_read.tv_nsec); + CDTIME_T_TO_DOUBLE (rf->rf_next_read)); /* Re-insert this read function into the heap again. */ c_heap_insert (read_heap, rf); @@ -532,6 +587,250 @@ static void stop_read_threads (void) read_threads_num = 0; } /* void stop_read_threads */ +static void plugin_value_list_free (value_list_t *vl) /* {{{ */ +{ + if (vl == NULL) + return; + + meta_data_destroy (vl->meta); + sfree (vl->values); + sfree (vl); +} /* }}} void plugin_value_list_free */ + +static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */ +{ + value_list_t *vl; + + if (vl_orig == NULL) + return (NULL); + + vl = malloc (sizeof (*vl)); + if (vl == NULL) + return (NULL); + memcpy (vl, vl_orig, sizeof (*vl)); + + vl->values = calloc (vl_orig->values_len, sizeof (*vl->values)); + if (vl->values == NULL) + { + plugin_value_list_free (vl); + return (NULL); + } + memcpy (vl->values, vl_orig->values, + vl_orig->values_len * sizeof (*vl->values)); + + vl->meta = meta_data_clone (vl->meta); + if ((vl_orig->meta != NULL) && (vl->meta == NULL)) + { + plugin_value_list_free (vl); + return (NULL); + } + + if (vl->time == 0) + vl->time = cdtime (); + + /* Fill in the interval from the thread context, if it is zero. */ + if (vl->interval == 0) + { + plugin_ctx_t ctx = plugin_get_ctx (); + + if (ctx.interval != 0) + vl->interval = ctx.interval; + else + { + char name[6 * DATA_MAX_NAME_LEN]; + FORMAT_VL (name, sizeof (name), vl); + ERROR ("plugin_value_list_clone: Unable to determine " + "interval from context for " + "value list \"%s\". " + "This indicates a broken plugin. " + "Please report this problem to the " + "collectd mailing list or at " + ".", name); + vl->interval = cf_get_default_interval (); + } + } + + return (vl); +} /* }}} value_list_t *plugin_value_list_clone */ + +static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */ +{ + write_queue_t *q; + + q = malloc (sizeof (*q)); + if (q == NULL) + return (ENOMEM); + q->next = NULL; + + q->vl = plugin_value_list_clone (vl); + if (q->vl == NULL) + { + sfree (q); + return (ENOMEM); + } + + /* Store context of caller (read plugin); otherwise, it would not be + * available to the write plugins when actually dispatching the + * value-list later on. */ + q->ctx = plugin_get_ctx (); + + pthread_mutex_lock (&write_lock); + + if (write_queue_tail == NULL) + { + write_queue_head = q; + write_queue_tail = q; + write_queue_length = 1; + } + else + { + write_queue_tail->next = q; + write_queue_tail = q; + write_queue_length += 1; + } + + pthread_cond_signal (&write_cond); + pthread_mutex_unlock (&write_lock); + + return (0); +} /* }}} int plugin_write_enqueue */ + +static value_list_t *plugin_write_dequeue (void) /* {{{ */ +{ + write_queue_t *q; + value_list_t *vl; + + pthread_mutex_lock (&write_lock); + + while (write_loop && (write_queue_head == NULL)) + pthread_cond_wait (&write_cond, &write_lock); + + if (write_queue_head == NULL) + { + pthread_mutex_unlock (&write_lock); + return (NULL); + } + + q = write_queue_head; + write_queue_head = q->next; + write_queue_length -= 1; + if (write_queue_head == NULL) { + write_queue_tail = NULL; + assert(0 == write_queue_length); + } + + pthread_mutex_unlock (&write_lock); + + (void) plugin_set_ctx (q->ctx); + + vl = q->vl; + sfree (q); + return (vl); +} /* }}} value_list_t *plugin_write_dequeue */ + +static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */ +{ + while (write_loop) + { + value_list_t *vl = plugin_write_dequeue (); + if (vl == NULL) + continue; + + plugin_dispatch_values_internal (vl); + + plugin_value_list_free (vl); + } + + pthread_exit (NULL); + return ((void *) 0); +} /* }}} void *plugin_write_thread */ + +static void start_write_threads (size_t num) /* {{{ */ +{ + size_t i; + + if (write_threads != NULL) + return; + + write_threads = (pthread_t *) calloc (num, sizeof (pthread_t)); + if (write_threads == NULL) + { + ERROR ("plugin: start_write_threads: calloc failed."); + return; + } + + write_threads_num = 0; + for (i = 0; i < num; i++) + { + int status; + + status = pthread_create (write_threads + write_threads_num, + /* attr = */ NULL, + plugin_write_thread, + /* arg = */ NULL); + if (status != 0) + { + char errbuf[1024]; + ERROR ("plugin: start_write_threads: pthread_create failed " + "with status %i (%s).", status, + sstrerror (status, errbuf, sizeof (errbuf))); + return; + } + + write_threads_num++; + } /* for (i) */ +} /* }}} void start_write_threads */ + +static void stop_write_threads (void) /* {{{ */ +{ + write_queue_t *q; + int i; + + if (write_threads == NULL) + return; + + INFO ("collectd: Stopping %zu write threads.", write_threads_num); + + pthread_mutex_lock (&write_lock); + write_loop = 0; + DEBUG ("plugin: stop_write_threads: Signalling `write_cond'"); + pthread_cond_broadcast (&write_cond); + pthread_mutex_unlock (&write_lock); + + for (i = 0; i < write_threads_num; i++) + { + if (pthread_join (write_threads[i], NULL) != 0) + { + ERROR ("plugin: stop_write_threads: pthread_join failed."); + } + write_threads[i] = (pthread_t) 0; + } + sfree (write_threads); + write_threads_num = 0; + + pthread_mutex_lock (&write_lock); + i = 0; + for (q = write_queue_head; q != NULL; ) + { + write_queue_t *q1 = q; + plugin_value_list_free (q->vl); + q = q->next; + sfree (q1); + i++; + } + write_queue_head = NULL; + write_queue_tail = NULL; + write_queue_length = 0; + pthread_mutex_unlock (&write_lock); + + if (i > 0) + { + WARNING ("plugin: %i value list%s left after shutting down " + "the write threads.", + i, (i == 1) ? " was" : "s were"); + } +} /* }}} void stop_write_threads */ + /* * Public functions */ @@ -550,8 +849,52 @@ void plugin_set_dir (const char *dir) } } +static _Bool plugin_is_loaded (char const *name) +{ + int status; + + if (plugins_loaded == NULL) + plugins_loaded = c_avl_create ((void *) strcasecmp); + assert (plugins_loaded != NULL); + + status = c_avl_get (plugins_loaded, name, /* ret_value = */ NULL); + return (status == 0); +} + +static int plugin_mark_loaded (char const *name) +{ + char *name_copy; + int status; + + name_copy = strdup (name); + if (name_copy == NULL) + return (ENOMEM); + + status = c_avl_insert (plugins_loaded, + /* key = */ name_copy, /* value = */ NULL); + return (status); +} + +static void plugin_free_loaded () +{ + void *key; + void *value; + + if (plugins_loaded == NULL) + return; + + while (c_avl_pick (plugins_loaded, &key, &value) == 0) + { + sfree (key); + assert (value == NULL); + } + + c_avl_destroy (plugins_loaded); + plugins_loaded = NULL; +} + #define BUFSIZE 512 -int plugin_load (const char *type) +int plugin_load (char const *plugin_name, uint32_t flags) { DIR *dh; const char *dir; @@ -563,17 +906,38 @@ int plugin_load (const char *type) struct dirent *de; int status; - DEBUG ("type = %s", type); + if (plugin_name == NULL) + return (EINVAL); + + /* Check if plugin is already loaded and don't do anything in this + * case. */ + if (plugin_is_loaded (plugin_name)) + return (0); dir = plugin_get_dir (); ret = 1; + /* + * XXX: Magic at work: + * + * Some of the language bindings, for example the Python and Perl + * plugins, need to be able to export symbols to the scripts they run. + * For this to happen, the "Globals" flag needs to be set. + * Unfortunately, this technical detail is hard to explain to the + * average user and she shouldn't have to worry about this, ideally. + * So in order to save everyone's sanity use a different default for a + * handful of special plugins. --octo + */ + if ((strcasecmp ("perl", plugin_name) == 0) + || (strcasecmp ("python", plugin_name) == 0)) + flags |= PLUGIN_FLAGS_GLOBAL; + /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the * type when matching the filename */ - status = ssnprintf (typename, sizeof (typename), "%s.so", type); + status = ssnprintf (typename, sizeof (typename), "%s.so", plugin_name); if ((status < 0) || ((size_t) status >= sizeof (typename))) { - WARNING ("snprintf: truncated: `%s.so'", type); + WARNING ("plugin_load: Filename too long: \"%s.so\"", plugin_name); return (-1); } typename_len = strlen (typename); @@ -581,7 +945,7 @@ int plugin_load (const char *type) if ((dh = opendir (dir)) == NULL) { char errbuf[1024]; - ERROR ("opendir (%s): %s", dir, + ERROR ("plugin_load: opendir (%s) failed: %s", dir, sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } @@ -595,40 +959,47 @@ int plugin_load (const char *type) "%s/%s", dir, de->d_name); if ((status < 0) || ((size_t) status >= sizeof (filename))) { - WARNING ("snprintf: truncated: `%s/%s'", dir, de->d_name); + WARNING ("plugin_load: Filename too long: \"%s/%s\"", + dir, de->d_name); continue; } if (lstat (filename, &statbuf) == -1) { char errbuf[1024]; - WARNING ("stat %s: %s", filename, + WARNING ("plugin_load: stat (\"%s\") failed: %s", + filename, sstrerror (errno, errbuf, sizeof (errbuf))); continue; } else if (!S_ISREG (statbuf.st_mode)) { /* don't follow symlinks */ - WARNING ("stat %s: not a regular file", filename); + WARNING ("plugin_load: %s is not a regular file.", + filename); continue; } - if (plugin_load_file (filename) == 0) + status = plugin_load_file (filename, flags); + if (status == 0) { /* success */ + plugin_mark_loaded (plugin_name); ret = 0; break; } else { - fprintf (stderr, "Unable to load plugin %s.\n", type); + ERROR ("plugin_load: Load plugin \"%s\" failed with " + "status %i.", plugin_name, status); } } closedir (dh); - if (filename[0] == '\0') - fprintf (stderr, "Could not find plugin %s.\n", type); + if (filename[0] == 0) + ERROR ("plugin_load: Could not find plugin \"%s\" in %s", + plugin_name, dir); return (ret); } @@ -665,13 +1036,9 @@ static int plugin_compare_read_func (const void *arg0, const void *arg1) rf0 = arg0; rf1 = arg1; - if (rf0->rf_next_read.tv_sec < rf1->rf_next_read.tv_sec) - return (-1); - else if (rf0->rf_next_read.tv_sec > rf1->rf_next_read.tv_sec) - return (1); - else if (rf0->rf_next_read.tv_nsec < rf1->rf_next_read.tv_nsec) + if (rf0->rf_next_read < rf1->rf_next_read) return (-1); - else if (rf0->rf_next_read.tv_nsec > rf1->rf_next_read.tv_nsec) + else if (rf0->rf_next_read > rf1->rf_next_read) return (1); else return (0); @@ -685,6 +1052,9 @@ static int plugin_insert_read (read_func_t *rf) int status; llentry_t *le; + rf->rf_next_read = cdtime (); + rf->rf_effective_interval = rf->rf_interval; + pthread_mutex_lock (&read_lock); if (read_list == NULL) @@ -709,6 +1079,17 @@ static int plugin_insert_read (read_func_t *rf) } } + le = llist_search (read_list, rf->rf_name); + if (le != NULL) + { + pthread_mutex_unlock (&read_lock); + WARNING ("The read function \"%s\" is already registered. " + "Check for duplicate \"LoadPlugin\" lines " + "in your configuration!", + rf->rf_name); + return (EINVAL); + } + le = llentry_create (rf->rf_name, rf); if (le == NULL) { @@ -729,6 +1110,8 @@ static int plugin_insert_read (read_func_t *rf) /* This does not fail. */ llist_append (read_list, le); + /* Wake up all the read threads. */ + pthread_cond_broadcast (&read_cond); pthread_mutex_unlock (&read_lock); return (0); } /* int plugin_insert_read */ @@ -737,52 +1120,59 @@ int plugin_register_read (const char *name, int (*callback) (void)) { read_func_t *rf; + int status; - rf = (read_func_t *) malloc (sizeof (read_func_t)); + rf = malloc (sizeof (*rf)); if (rf == NULL) { - char errbuf[1024]; - ERROR ("plugin_register_read: malloc failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return (-1); + ERROR ("plugin_register_read: malloc failed."); + return (ENOMEM); } memset (rf, 0, sizeof (read_func_t)); rf->rf_callback = (void *) callback; rf->rf_udata.data = NULL; rf->rf_udata.free_func = NULL; - sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); + rf->rf_ctx = plugin_get_ctx (); + rf->rf_group[0] = '\0'; + rf->rf_name = strdup (name); rf->rf_type = RF_SIMPLE; - rf->rf_interval.tv_sec = 0; - rf->rf_interval.tv_nsec = 0; - rf->rf_effective_interval = rf->rf_interval; + rf->rf_interval = plugin_get_interval (); + + status = plugin_insert_read (rf); + if (status != 0) + sfree (rf); - return (plugin_insert_read (rf)); + return (status); } /* int plugin_register_read */ -int plugin_register_complex_read (const char *name, +int plugin_register_complex_read (const char *group, const char *name, plugin_read_cb callback, const struct timespec *interval, user_data_t *user_data) { read_func_t *rf; + int status; - rf = (read_func_t *) malloc (sizeof (read_func_t)); + rf = malloc (sizeof (*rf)); if (rf == NULL) { ERROR ("plugin_register_complex_read: malloc failed."); - return (-1); + return (ENOMEM); } memset (rf, 0, sizeof (read_func_t)); rf->rf_callback = (void *) callback; - sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); + if (group != NULL) + sstrncpy (rf->rf_group, group, sizeof (rf->rf_group)); + else + rf->rf_group[0] = '\0'; + rf->rf_name = strdup (name); rf->rf_type = RF_COMPLEX; if (interval != NULL) - { - rf->rf_interval = *interval; - } - rf->rf_effective_interval = rf->rf_interval; + rf->rf_interval = TIMESPEC_TO_CDTIME_T (interval); + else + rf->rf_interval = plugin_get_interval (); /* Set user data */ if (user_data == NULL) @@ -795,7 +1185,13 @@ int plugin_register_complex_read (const char *name, rf->rf_udata = *user_data; } - return (plugin_insert_read (rf)); + rf->rf_ctx = plugin_get_ctx (); + + status = plugin_insert_read (rf); + if (status != 0) + sfree (rf); + + return (status); } /* int plugin_register_complex_read */ int plugin_register_write (const char *name, @@ -812,13 +1208,41 @@ int plugin_register_flush (const char *name, (void *) callback, ud)); } /* int plugin_register_flush */ -int plugin_register_shutdown (char *name, +int plugin_register_missing (const char *name, + plugin_missing_cb callback, user_data_t *ud) +{ + return (create_register_callback (&list_missing, name, + (void *) callback, ud)); +} /* int plugin_register_missing */ + +int plugin_register_shutdown (const char *name, int (*callback) (void)) { return (create_register_callback (&list_shutdown, name, (void *) callback, /* user_data = */ NULL)); } /* int plugin_register_shutdown */ +static void plugin_free_data_sets (void) +{ + void *key; + void *value; + + if (data_sets == NULL) + return; + + while (c_avl_pick (data_sets, &key, &value) == 0) + { + data_set_t *ds = value; + /* key is a pointer to ds->type */ + + sfree (ds->ds); + sfree (ds); + } + + c_avl_destroy (data_sets); + data_sets = NULL; +} /* void plugin_free_data_sets */ + int plugin_register_data_set (const data_set_t *ds) { data_set_t *ds_copy; @@ -927,6 +1351,67 @@ int plugin_unregister_read (const char *name) /* {{{ */ return (0); } /* }}} int plugin_unregister_read */ +static int compare_read_func_group (llentry_t *e, void *ud) /* {{{ */ +{ + read_func_t *rf = e->value; + char *group = ud; + + return strcmp (rf->rf_group, (const char *)group); +} /* }}} int compare_read_func_group */ + +int plugin_unregister_read_group (const char *group) /* {{{ */ +{ + llentry_t *le; + read_func_t *rf; + + int found = 0; + + if (group == NULL) + return (-ENOENT); + + pthread_mutex_lock (&read_lock); + + if (read_list == NULL) + { + pthread_mutex_unlock (&read_lock); + return (-ENOENT); + } + + while (42) + { + le = llist_search_custom (read_list, + compare_read_func_group, (void *)group); + + if (le == NULL) + break; + + ++found; + + llist_remove (read_list, le); + + rf = le->value; + assert (rf != NULL); + rf->rf_type = RF_REMOVE; + + llentry_destroy (le); + + DEBUG ("plugin_unregister_read_group: " + "Marked `%s' (group `%s') for removal.", + rf->rf_name, group); + } + + pthread_mutex_unlock (&read_lock); + + if (found == 0) + { + WARNING ("plugin_unregister_read_group: No such " + "group of read function: %s", group); + return (-ENOENT); + } + + return (0); +} /* }}} int plugin_unregister_read_group */ + int plugin_unregister_write (const char *name) { return (plugin_unregister (list_write, name)); @@ -937,6 +1422,11 @@ int plugin_unregister_flush (const char *name) return (plugin_unregister (list_flush, name)); } +int plugin_unregister_missing (const char *name) +{ + return (plugin_unregister (list_missing, name)); +} + int plugin_unregister_shutdown (const char *name) { return (plugin_unregister (list_shutdown, name)); @@ -970,7 +1460,8 @@ int plugin_unregister_notification (const char *name) void plugin_init_all (void) { - const char *chain_name; + char const *chain_name; + long write_threads_num; llentry_t *le; int status; @@ -983,6 +1474,37 @@ void plugin_init_all (void) chain_name = global_option_get ("PostCacheChain"); post_cache_chain = fc_chain_get_by_name (chain_name); + write_limit_high = global_option_get_long ("WriteQueueLimitHigh", + /* default = */ 0); + if (write_limit_high < 0) + { + ERROR ("WriteQueueLimitHigh must be positive or zero."); + write_limit_high = 0; + } + + write_limit_low = global_option_get_long ("WriteQueueLimitLow", + /* default = */ write_limit_high / 2); + if (write_limit_low < 0) + { + ERROR ("WriteQueueLimitLow must be positive or zero."); + write_limit_low = write_limit_high / 2; + } + else if (write_limit_low > write_limit_high) + { + ERROR ("WriteQueueLimitLow must not be larger than " + "WriteQueueLimitHigh."); + write_limit_low = write_limit_high; + } + + write_threads_num = global_option_get_long ("WriteThreads", + /* default = */ 5); + if (write_threads_num < 1) + { + ERROR ("WriteThreads must be positive."); + write_threads_num = 5; + } + + start_write_threads ((size_t) write_threads_num); if ((list_init == NULL) && (read_heap == NULL)) return; @@ -995,10 +1517,13 @@ void plugin_init_all (void) { callback_func_t *cf; plugin_init_cb callback; + plugin_ctx_t old_ctx; cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); callback = cf->cf_callback; status = (*callback) (); + plugin_set_ctx (old_ctx); if (status != 0) { @@ -1051,11 +1576,14 @@ int plugin_read_all_once (void) while (42) { read_func_t *rf; + plugin_ctx_t old_ctx; - rf = c_head_get_root (read_heap); + rf = c_heap_get_root (read_heap); if (rf == NULL) break; + old_ctx = plugin_set_ctx (rf->rf_ctx); + if (rf->rf_type == RF_SIMPLE) { int (*callback) (void); @@ -1071,6 +1599,8 @@ int plugin_read_all_once (void) status = (*callback) (&rf->rf_udata); } + plugin_set_ctx (old_ctx); + if (status != 0) { NOTICE ("read-function of plugin `%s' failed.", @@ -1117,6 +1647,9 @@ int plugin_write (const char *plugin, /* {{{ */ callback_func_t *cf = le->value; plugin_write_cb callback; + /* do not switch plugin context; rather keep the context (interval) + * information of the calling read plugin */ + DEBUG ("plugin: plugin_write: Writing values via %s.", le->key); callback = cf->cf_callback; status = (*callback) (ds, vl, &cf->cf_udata); @@ -1152,6 +1685,9 @@ int plugin_write (const char *plugin, /* {{{ */ cf = le->value; + /* do not switch plugin context; rather keep the context (interval) + * information of the calling read plugin */ + DEBUG ("plugin: plugin_write: Writing values via %s.", le->key); callback = cf->cf_callback; status = (*callback) (ds, vl, &cf->cf_udata); @@ -1160,7 +1696,7 @@ int plugin_write (const char *plugin, /* {{{ */ return (status); } /* }}} int plugin_write */ -int plugin_flush (const char *plugin, int timeout, const char *identifier) +int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier) { llentry_t *le; @@ -1172,6 +1708,7 @@ int plugin_flush (const char *plugin, int timeout, const char *identifier) { callback_func_t *cf; plugin_flush_cb callback; + plugin_ctx_t old_ctx; if ((plugin != NULL) && (strcmp (plugin, le->key) != 0)) @@ -1181,10 +1718,13 @@ int plugin_flush (const char *plugin, int timeout, const char *identifier) } cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); callback = cf->cf_callback; (*callback) (timeout, identifier, &cf->cf_udata); + plugin_set_ctx (old_ctx); + le = le->next; } return (0); @@ -1205,7 +1745,8 @@ void plugin_shutdown_all (void) destroy_read_heap (); - plugin_flush (/* plugin = */ NULL, /* timeout = */ -1, + plugin_flush (/* plugin = */ NULL, + /* timeout = */ 0, /* identifier = */ NULL); le = NULL; @@ -1216,8 +1757,10 @@ void plugin_shutdown_all (void) { callback_func_t *cf; plugin_shutdown_cb callback; + plugin_ctx_t old_ctx; cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); callback = cf->cf_callback; /* Advance the pointer before calling the callback allows @@ -1227,22 +1770,71 @@ void plugin_shutdown_all (void) le = le->next; (*callback) (); + + plugin_set_ctx (old_ctx); } + stop_write_threads (); + /* Write plugins which use the `user_data' pointer usually need the * same data available to the flush callback. If this is the case, set * the free_function to NULL when registering the flush callback and to * the real free function when registering the write callback. This way * the data isn't freed twice. */ destroy_all_callbacks (&list_flush); + destroy_all_callbacks (&list_missing); destroy_all_callbacks (&list_write); destroy_all_callbacks (&list_notification); destroy_all_callbacks (&list_shutdown); destroy_all_callbacks (&list_log); + + plugin_free_loaded (); + plugin_free_data_sets (); } /* void plugin_shutdown_all */ -int plugin_dispatch_values (value_list_t *vl) +int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */ +{ + llentry_t *le; + + if (list_missing == NULL) + return (0); + + le = llist_head (list_missing); + while (le != NULL) + { + callback_func_t *cf; + plugin_missing_cb callback; + plugin_ctx_t old_ctx; + int status; + + cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); + callback = cf->cf_callback; + + status = (*callback) (vl, &cf->cf_udata); + plugin_set_ctx (old_ctx); + if (status != 0) + { + if (status < 0) + { + ERROR ("plugin_dispatch_missing: Callback function \"%s\" " + "failed with status %i.", + le->key, status); + return (status); + } + else + { + return (0); + } + } + + le = le->next; + } + return (0); +} /* int }}} plugin_dispatch_missing */ + +static int plugin_dispatch_values_internal (value_list_t *vl) { int status; static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC; @@ -1257,7 +1849,8 @@ int plugin_dispatch_values (value_list_t *vl) if ((vl == NULL) || (vl->type[0] == 0) || (vl->values == NULL) || (vl->values_len < 1)) { - ERROR ("plugin_dispatch_values: Invalid value list."); + ERROR ("plugin_dispatch_values: Invalid value list " + "from plugin %s.", vl->plugin); return (-1); } @@ -1283,21 +1876,26 @@ int plugin_dispatch_values (value_list_t *vl) if (c_avl_get (data_sets, vl->type, (void *) &ds) != 0) { - INFO ("plugin_dispatch_values: Dataset not found: %s", vl->type); + char ident[6 * DATA_MAX_NAME_LEN]; + + FORMAT_VL (ident, sizeof (ident), vl); + INFO ("plugin_dispatch_values: Dataset not found: %s " + "(from \"%s\"), check your types.db!", + vl->type, ident); return (-1); } - if (vl->time == 0) - vl->time = time (NULL); - - if (vl->interval <= 0) - vl->interval = interval_g; + /* Assured by plugin_value_list_clone(). The time is determined at + * _enqueue_ time. */ + assert (vl->time != 0); + assert (vl->interval != 0); - DEBUG ("plugin_dispatch_values: time = %u; interval = %i; " + DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; " "host = %s; " "plugin = %s; plugin_instance = %s; " "type = %s; type_instance = %s;", - (unsigned int) vl->time, vl->interval, + CDTIME_T_TO_DOUBLE (vl->time), + CDTIME_T_TO_DOUBLE (vl->interval), vl->host, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance); @@ -1381,9 +1979,6 @@ int plugin_dispatch_values (value_list_t *vl) /* Update the value cache */ uc_update (ds, vl); - /* Initiate threshold checking */ - ut_check_threshold (ds, vl); - if (post_cache_chain != NULL) { status = fc_process_chain (ds, vl, post_cache_chain); @@ -1414,7 +2009,149 @@ int plugin_dispatch_values (value_list_t *vl) } return (0); -} /* int plugin_dispatch_values */ +} /* int plugin_dispatch_values_internal */ + +static double get_drop_probability (void) /* {{{ */ +{ + long pos; + long size; + long wql; + + pthread_mutex_lock (&write_lock); + wql = write_queue_length; + pthread_mutex_unlock (&write_lock); + + if (wql < write_limit_low) + return (0.0); + if (wql >= write_limit_high) + return (1.0); + + pos = 1 + wql - write_limit_low; + size = 1 + write_limit_high - write_limit_low; + + return (((double) pos) / ((double) size)); +} /* }}} double get_drop_probability */ + +static _Bool check_drop_value (void) /* {{{ */ +{ + static cdtime_t last_message_time = 0; + static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER; + + double p; + double q; + int status; + + if (write_limit_high == 0) + return (0); + + p = get_drop_probability (); + if (p == 0.0) + return (0); + + status = pthread_mutex_trylock (&last_message_lock); + if (status == 0) + { + cdtime_t now; + + now = cdtime (); + if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1)) + { + last_message_time = now; + ERROR ("plugin_dispatch_values: Low water mark " + "reached. Dropping %.0f%% of metrics.", + 100.0 * p); + } + pthread_mutex_unlock (&last_message_lock); + } + + if (p == 1.0) + return (1); + + q = cdrand_d (); + if (q > p) + return (1); + else + return (0); +} /* }}} _Bool check_drop_value */ + +int plugin_dispatch_values (value_list_t const *vl) +{ + int status; + + if (check_drop_value ()) + return (0); + + status = plugin_write_enqueue (vl); + if (status != 0) + { + char errbuf[1024]; + ERROR ("plugin_dispatch_values: plugin_write_enqueue failed " + "with status %i (%s).", status, + sstrerror (status, errbuf, sizeof (errbuf))); + return (status); + } + + return (0); +} + +__attribute__((sentinel)) +int plugin_dispatch_multivalue (value_list_t const *template, /* {{{ */ + _Bool store_percentage, ...) +{ + value_list_t *vl; + int failed = 0; + gauge_t sum = 0.0; + va_list ap; + + assert (template->values_len == 1); + + va_start (ap, store_percentage); + while (42) + { + char const *name; + gauge_t value; + + name = va_arg (ap, char const *); + if (name == NULL) + break; + + value = va_arg (ap, gauge_t); + if (!isnan (value)) + sum += value; + } + va_end (ap); + + vl = plugin_value_list_clone (template); + /* plugin_value_list_clone makes sure vl->time is set to non-zero. */ + if (store_percentage) + sstrncpy (vl->type, "percent", sizeof (vl->type)); + + va_start (ap, store_percentage); + while (42) + { + char const *name; + int status; + + /* Set the type instance. */ + name = va_arg (ap, char const *); + if (name == NULL) + break; + sstrncpy (vl->type_instance, name, sizeof (vl->type_instance)); + + /* Set the value. */ + vl->values[0].gauge = va_arg (ap, gauge_t); + if (store_percentage) + vl->values[0].gauge *= 100.0 / sum; + + status = plugin_write_enqueue (vl); + if (status != 0) + failed++; + } + va_end (ap); + + plugin_value_list_free (vl); + return (failed); +} /* }}} int plugin_dispatch_multivalue */ int plugin_dispatch_notification (const notification_t *notif) { @@ -1422,9 +2159,9 @@ int plugin_dispatch_notification (const notification_t *notif) /* Possible TODO: Add flap detection here */ DEBUG ("plugin_dispatch_notification: severity = %i; message = %s; " - "time = %u; host = %s;", + "time = %.3f; host = %s;", notif->severity, notif->message, - (unsigned int) notif->time, notif->host); + CDTIME_T_TO_DOUBLE (notif->time), notif->host); /* Nobody cares for notifications */ if (list_notification == NULL) @@ -1437,6 +2174,9 @@ int plugin_dispatch_notification (const notification_t *notif) plugin_notification_cb callback; int status; + /* do not switch plugin context; rather keep the context + * (interval) information of the calling plugin */ + cf = le->value; callback = cf->cf_callback; status = (*callback) (notif, &cf->cf_udata); @@ -1484,16 +2224,63 @@ void plugin_log (int level, const char *format, ...) cf = le->value; callback = cf->cf_callback; + /* do not switch plugin context; rather keep the context + * (interval) information of the calling plugin */ + (*callback) (level, msg, &cf->cf_udata); le = le->next; } } /* void plugin_log */ +int parse_log_severity (const char *severity) +{ + int log_level = -1; + + if ((0 == strcasecmp (severity, "emerg")) + || (0 == strcasecmp (severity, "alert")) + || (0 == strcasecmp (severity, "crit")) + || (0 == strcasecmp (severity, "err"))) + log_level = LOG_ERR; + else if (0 == strcasecmp (severity, "warning")) + log_level = LOG_WARNING; + else if (0 == strcasecmp (severity, "notice")) + log_level = LOG_NOTICE; + else if (0 == strcasecmp (severity, "info")) + log_level = LOG_INFO; +#if COLLECT_DEBUG + else if (0 == strcasecmp (severity, "debug")) + log_level = LOG_DEBUG; +#endif /* COLLECT_DEBUG */ + + return (log_level); +} /* int parse_log_severity */ + +int parse_notif_severity (const char *severity) +{ + int notif_severity = -1; + + if (strcasecmp (severity, "FAILURE") == 0) + notif_severity = NOTIF_FAILURE; + else if (strcmp (severity, "OKAY") == 0) + notif_severity = NOTIF_OKAY; + else if ((strcmp (severity, "WARNING") == 0) + || (strcmp (severity, "WARN") == 0)) + notif_severity = NOTIF_WARNING; + + return (notif_severity); +} /* int parse_notif_severity */ + const data_set_t *plugin_get_ds (const char *name) { data_set_t *ds; + if (data_sets == NULL) + { + ERROR ("plugin_get_ds: No data sets are defined yet."); + return (NULL); + } + if (c_avl_get (data_sets, name, (void *) &ds) != 0) { DEBUG ("No such dataset registered: %s", name); @@ -1558,7 +2345,7 @@ static int plugin_notification_meta_add (notification_t *n, } case NM_TYPE_BOOLEAN: { - meta->nm_value.nm_boolean = *((bool *) value); + meta->nm_value.nm_boolean = *((_Bool *) value); break; } default: @@ -1612,7 +2399,7 @@ int plugin_notification_meta_add_double (notification_t *n, int plugin_notification_meta_add_boolean (notification_t *n, const char *name, - bool value) + _Bool value) { return (plugin_notification_meta_add (n, name, NM_TYPE_BOOLEAN, &value)); } @@ -1678,4 +2465,122 @@ int plugin_notification_meta_free (notification_meta_t *n) return (0); } /* int plugin_notification_meta_free */ +static void plugin_ctx_destructor (void *ctx) +{ + sfree (ctx); +} /* void plugin_ctx_destructor */ + +static plugin_ctx_t ctx_init = { /* interval = */ 0 }; + +static plugin_ctx_t *plugin_ctx_create (void) +{ + plugin_ctx_t *ctx; + + ctx = malloc (sizeof (*ctx)); + if (ctx == NULL) { + char errbuf[1024]; + ERROR ("Failed to allocate plugin context: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return NULL; + } + + *ctx = ctx_init; + assert (plugin_ctx_key_initialized); + pthread_setspecific (plugin_ctx_key, ctx); + DEBUG("Created new plugin context."); + return (ctx); +} /* int plugin_ctx_create */ + +void plugin_init_ctx (void) +{ + pthread_key_create (&plugin_ctx_key, plugin_ctx_destructor); + plugin_ctx_key_initialized = 1; +} /* void plugin_init_ctx */ + +plugin_ctx_t plugin_get_ctx (void) +{ + plugin_ctx_t *ctx; + + assert (plugin_ctx_key_initialized); + ctx = pthread_getspecific (plugin_ctx_key); + + if (ctx == NULL) { + ctx = plugin_ctx_create (); + /* this must no happen -- exit() instead? */ + if (ctx == NULL) + return ctx_init; + } + + return (*ctx); +} /* plugin_ctx_t plugin_get_ctx */ + +plugin_ctx_t plugin_set_ctx (plugin_ctx_t ctx) +{ + plugin_ctx_t *c; + plugin_ctx_t old; + + assert (plugin_ctx_key_initialized); + c = pthread_getspecific (plugin_ctx_key); + + if (c == NULL) { + c = plugin_ctx_create (); + /* this must no happen -- exit() instead? */ + if (c == NULL) + return ctx_init; + } + + old = *c; + *c = ctx; + + return (old); +} /* void plugin_set_ctx */ + +cdtime_t plugin_get_interval (void) +{ + cdtime_t interval; + + interval = plugin_get_ctx().interval; + if (interval > 0) + return interval; + + return cf_get_default_interval (); +} /* cdtime_t plugin_get_interval */ + +typedef struct { + plugin_ctx_t ctx; + void *(*start_routine) (void *); + void *arg; +} plugin_thread_t; + +static void *plugin_thread_start (void *arg) +{ + plugin_thread_t *plugin_thread = arg; + + void *(*start_routine) (void *) = plugin_thread->start_routine; + void *plugin_arg = plugin_thread->arg; + + plugin_set_ctx (plugin_thread->ctx); + + free (plugin_thread); + + return start_routine (plugin_arg); +} /* void *plugin_thread_start */ + +int plugin_thread_create (pthread_t *thread, const pthread_attr_t *attr, + void *(*start_routine) (void *), void *arg) +{ + plugin_thread_t *plugin_thread; + + plugin_thread = malloc (sizeof (*plugin_thread)); + if (plugin_thread == NULL) + return -1; + + plugin_thread->ctx = plugin_get_ctx (); + plugin_thread->start_routine = start_routine; + plugin_thread->arg = arg; + + return pthread_create (thread, attr, + plugin_thread_start, plugin_thread); +} /* int plugin_thread_create */ + /* vim: set sw=8 ts=8 noet fdm=marker : */